Zoie StreamDataProvider类作用
1、zoie在运行是需要实例化的对象有:StreamDataProvider、ZoieSystem(也是一个消费者) 。
2、在StreamDataProvider中积累数据,然后调用StreamDataProvider的start()方法,该方法会启动一个消费StreamDataProvider中数据的线程DataThread<V> 。
3、然后DataThread<V>会去处理StreamDataProvider中的数据,通过不断调用StreamDataProvider.next()方法来取得数据,当取到一定的数量(就是beachSize)后会调用ZoieSystem的consume()方法,将这一部分数据
传给ZoieSystem来进行索引 。
1packageproj.zoie.impl.indexing;
2
3 importjava.util.Collection;
4importjava.util.LinkedList;
5
6importorg.apache.log4j.Logger;
7
8importproj.zoie.api.DataConsumer;
9importproj.zoie.api.DataProvider;
10importproj.zoie.api.ZoieException;
11importproj.zoie.api.DataConsumer.DataEvent;
12importproj.zoie.mbean.DataProviderAdminMBean;
13/**\
14* StreamDataProvider中的start()方法,会启动一个来消费StreamDataProvider中数据的线程DataThread,DataThread通过
15* 不断调用StreamDataProvider.next()来取得数据,缓存在自己的List<DateEvent>中,当到达一定的数量(_batchSize)时统一交给ZoieSystem来索引消费
16* @authorcctv
17*
18* @param<V>
19*/
20publicabstractclassStreamDataProvider<V>implementsDataProvider<V>,DataProviderAdminMBean{
21 privatestaticfinalLogger log =Logger.getLogger(StreamDataProvider.class);
22
23 privateint_batchSize; //每次处理的批量,DataProvider的处理线程DataThread中缓存的数据的数量,
24 privateDataConsumer<V>_consumer; //消费数据的消费者,一般注入的是ZoieSystem(本身是个消费者)
25 privateDataThread<V>_thread; //消费DataProvider中数据的线程
26
27 publicStreamDataProvider()
28 {
29 _batchSize=1;
30 _consumer=null;
31 }
32
33 publicvoidsetDataConsumer(DataConsumer<V>consumer)
34 {
35 _consumer=consumer;
36 }
37
38 publicabstractDataEvent<V>next(); //实现者继承的方法,取数据,在这个方法里面定义自定义的数据来源
39
40 protectedabstractvoidreset();
41
42 publicintgetBatchSize() {
43 return_batchSize;
44 }
45
46 publicvoidpause() {
47 if(_thread !=null)
48 {
49 _thread.pauseDataFeed();
50 }
51 }
52
53 publicvoidresume() {
54 if(_thread !=null)
55 {
56 _thread.resumeDataFeed();
57 }
58 }
59
60 publicvoidsetBatchSize(intbatchSize) {
61 _batchSize=Math.max(1, batchSize);
62 }
63
64 publicvoidstop()
65 {
66 if(_thread!=null&&_thread.isAlive())
67 {
68 _thread.terminate();
69 try{
70 _thread.join();
71 } catch(InterruptedException e) {
72 log.warn("stopping interrupted");
73 }
74 }
75 }
76
77 publicvoidstart() {
78 if(_thread==null||!_thread.isAlive())
79 {
80 reset();
81 _thread =newDataThread<V>(this);
82 _thread.start();
83 }
84 }
85
86 publicvoidsyncWthVersion(longtimeInMillis, longversion) throwsZoieException
87 {
88 _thread.syncWthVersion(timeInMillis, version);
89 }
90 /**
91 * 消费中DataProvider数据的线程 。
92 * 每次由DataProvider启动一个该线程的实例(假如当前没有该线程的实例)来将DataProvider中的数据消费到ZoieSystem中
93 * @authorcctv
94 *
95 * @param<V>
96 */
97 privatestaticfinalclassDataThread<V>extendsThread
98 {
99 privateCollection<DataEvent<V>>_batch;
100 privatelong_currentVersion; //已经从DataProvider取出来的DateEvent<V>的最大的_currentVersion
101 privatefinalStreamDataProvider<V>_dataProvider;
102 privateboolean_paused; //通过该参数来暂停该线程的运行
103 privateboolean_stop; //通过该参数来停止该线程的运行
104
105 DataThread(StreamDataProvider<V>dataProvider)
106 {
107 super("Stream DataThread");
108 setDaemon(false);
109 _dataProvider =dataProvider;
110 _currentVersion =0L;
111 _paused =false;
112 _stop =false;
113 _batch =newLinkedList<DataEvent<V>>();
114 }
115
116 voidterminate()
117 {
118 synchronized(this)
119 {
120 _stop =true;
121 this.notifyAll();
122 }
123 }
124
125 voidpauseDataFeed()
126 {
127 synchronized(this)
128 {
129 _paused =true;
130 }
131 }
132
133 voidresumeDataFeed()
134 {
135 synchronized(this)
136 {
137 _paused =false;
138 this.notifyAll();
139 }
140 }
141 //当从DataProvider中取到了_batchSize个数据 或者从DataProvider取到了null(DataProvider中无数据了)时候,
142 //将这些数据flush()到ZoieSystem中
143 privatevoidflush()
144 {
145 //FLUSH
146 Collection<DataEvent<V>>tmp;
147 tmp =_batch;
148 _batch =newLinkedList<DataEvent<V>>();
149
150 try
151 {
152 if(_dataProvider._consumer!=null)
153 {
154 _dataProvider._consumer.consume(tmp);
155 }
156 }
157 catch(ZoieException e)
158 {
159 log.error(e.getMessage(), e);
160 }
161 }
162
163 publiclonggetCurrentVersion()
164 {
165 synchronized(this)
166 {
167 return_currentVersion;
168 }
169 }
170
171 publicvoidsyncWthVersion(longtimeInMillis, longversion) throwsZoieException
172 {
173 longnow =System.currentTimeMillis();
174 longdue =now +timeInMillis;
175 synchronized(this)
176 {
177 while(_currentVersion <version)
178 {
179 if(now >due)
180 {
181 thrownewZoieException("sync timed out");
182 }
183 try
184 {
185 this.wait(due -now);
186 }
187 catch(InterruptedException e)
188 {
189 log.warn(e.getMessage(), e);
190 }
191 now =System.currentTimeMillis();
192 }
193 }
194 }
195 //消费StreamDataProvider中数据的主要方法
196 publicvoidrun()
197 {
198 //可以通过_stop参数来flush数据并停止线程
199 while(!_stop)
200 {
201 //暂停,可以通过StreamDataProvider中相应的方法来暂停该取数据进程的执行 。
202 synchronized(this)
203 {
204 while(_paused &&!_stop)
205 {
206 try{
207 this.wait();
208 } catch(InterruptedException e) {
209 continue;
210 }
211 }
212 }
213 if(!_stop)
214 {
215 DataEvent<V>data =_dataProvider.next(); //调用_dataProvider.next()取得数据,然后现在自己的list中缓存起来
216 if(data!=null)
217 {
218 synchronized(this)
219 {
220 _batch.add(data);
221 if(_batch.size()>=_dataProvider._batchSize) //达到批量处理数量缓存起来
222 {
223 flush();
224 }
225 _currentVersion=Math.max(_currentVersion, data.getVersion());
226 this.notifyAll();
227 }
228 }
229 else
230 {
231 synchronized(this)
232 {
233 flush();
234 _stop=true;
235 return;
236 }
237 }
238 }
239 }
240 }
241 }
242}