Zoie StreamDataProvider类作用

1、zoie在运行是需要实例化的对象有:StreamDataProvider、ZoieSystem(也是一个消费者) 。

2、在StreamDataProvider中积累数据,然后调用StreamDataProvider的start()方法,该方法会启动一个消费StreamDataProvider中数据的线程DataThread<V> 。

3、然后DataThread<V>会去处理StreamDataProvider中的数据,通过不断调用StreamDataProvider.next()方法来取得数据,当取到一定的数量(就是beachSize)后会调用ZoieSystem的consume()方法,将这一部分数据

传给ZoieSystem来进行索引 。

1packageproj.zoie.impl.indexing;
  2
  3 importjava.util.Collection;
  4importjava.util.LinkedList;
  5
  6importorg.apache.log4j.Logger;
  7
  8importproj.zoie.api.DataConsumer;
  9importproj.zoie.api.DataProvider;
10importproj.zoie.api.ZoieException;
11importproj.zoie.api.DataConsumer.DataEvent;
12importproj.zoie.mbean.DataProviderAdminMBean;
13/**\
14* StreamDataProvider中的start()方法,会启动一个来消费StreamDataProvider中数据的线程DataThread,DataThread通过
15* 不断调用StreamDataProvider.next()来取得数据,缓存在自己的List<DateEvent>中,当到达一定的数量(_batchSize)时统一交给ZoieSystem来索引消费
16* @authorcctv
17*
18* @param<V>
19*/
20publicabstractclassStreamDataProvider<V>implementsDataProvider<V>,DataProviderAdminMBean{
21    privatestaticfinalLogger log =Logger.getLogger(StreamDataProvider.class);
22    
23    privateint_batchSize;      //每次处理的批量,DataProvider的处理线程DataThread中缓存的数据的数量,
24    privateDataConsumer<V>_consumer; //消费数据的消费者,一般注入的是ZoieSystem(本身是个消费者)
25    privateDataThread<V>_thread;     //消费DataProvider中数据的线程
26    
27    publicStreamDataProvider()
28    {
29        _batchSize=1;
30        _consumer=null;
31    }
32    
33    publicvoidsetDataConsumer(DataConsumer<V>consumer)
34    {
35      _consumer=consumer;    
36    }
37
38    publicabstractDataEvent<V>next();   //实现者继承的方法,取数据,在这个方法里面定义自定义的数据来源
39    
40    protectedabstractvoidreset();       
41    
42    publicintgetBatchSize() {
43        return_batchSize;
44    }
45
46    publicvoidpause() {
47        if(_thread !=null)
48        {
49            _thread.pauseDataFeed();
50        }
51    }
52
53    publicvoidresume() {
54        if(_thread !=null)
55        {
56            _thread.resumeDataFeed();
57        }
58    }
59
60    publicvoidsetBatchSize(intbatchSize) {
61        _batchSize=Math.max(1, batchSize);
62    }
63    
64    publicvoidstop()
65    {
66        if(_thread!=null&&_thread.isAlive())
67        {
68            _thread.terminate();
69            try{
70                _thread.join();
71            } catch(InterruptedException e) {
72                log.warn("stopping interrupted");
73            }
74        }
75    }
76
77    publicvoidstart() {
78        if(_thread==null||!_thread.isAlive())
79        {
80            reset();
81            _thread =newDataThread<V>(this);
82            _thread.start();
83        }
84    }
85    
86    publicvoidsyncWthVersion(longtimeInMillis, longversion) throwsZoieException
87    {
88      _thread.syncWthVersion(timeInMillis, version);
89    }
90    /**
91     * 消费中DataProvider数据的线程 。
92     * 每次由DataProvider启动一个该线程的实例(假如当前没有该线程的实例)来将DataProvider中的数据消费到ZoieSystem中
93     * @authorcctv
94     *
95     * @param<V>
96     */
97    privatestaticfinalclassDataThread<V>extendsThread
98    {
99        privateCollection<DataEvent<V>>_batch;
100        privatelong_currentVersion;   //已经从DataProvider取出来的DateEvent<V>的最大的_currentVersion
101        privatefinalStreamDataProvider<V>_dataProvider;
102        privateboolean_paused;       //通过该参数来暂停该线程的运行
103        privateboolean_stop;         //通过该参数来停止该线程的运行
104        
105        DataThread(StreamDataProvider<V>dataProvider)
106        {
107            super("Stream DataThread");
108            setDaemon(false);
109            _dataProvider =dataProvider;
110            _currentVersion =0L;
111            _paused =false;
112            _stop =false;
113            _batch =newLinkedList<DataEvent<V>>();
114        }
115        
116        voidterminate()
117        {
118            synchronized(this)
119            {
120                _stop =true;
121               this.notifyAll();
122            }
123        }
124        
125        voidpauseDataFeed()
126        {
127            synchronized(this)
128            {
129                _paused =true;
130            }
131        }
132        
133        voidresumeDataFeed()
134        {
135            synchronized(this)
136            {
137                _paused =false;
138                this.notifyAll();
139            }
140        }
141        //当从DataProvider中取到了_batchSize个数据 或者从DataProvider取到了null(DataProvider中无数据了)时候,
142        //将这些数据flush()到ZoieSystem中
143        privatevoidflush()
144        {
145            //FLUSH
146            Collection<DataEvent<V>>tmp;
147            tmp =_batch;
148            _batch =newLinkedList<DataEvent<V>>();
149
150            try
151            {
152              if(_dataProvider._consumer!=null)
153              {
154                  _dataProvider._consumer.consume(tmp);
155              }
156            }
157            catch(ZoieException e)
158            {
159              log.error(e.getMessage(), e);
160            }
161        }
162        
163        publiclonggetCurrentVersion()
164        {
165            synchronized(this)
166            {
167              return_currentVersion;
168            }
169        }
170        
171        publicvoidsyncWthVersion(longtimeInMillis, longversion) throwsZoieException
172        {
173          longnow =System.currentTimeMillis();
174          longdue =now +timeInMillis;
175          synchronized(this)
176          {
177            while(_currentVersion <version)
178            {
179              if(now >due)
180              {
181                thrownewZoieException("sync timed out");
182              }
183              try
184              {
185                this.wait(due -now);
186              }
187              catch(InterruptedException e)
188              {
189                  log.warn(e.getMessage(), e);
190              }
191              now =System.currentTimeMillis();
192            }
193          }
194        }
195        //消费StreamDataProvider中数据的主要方法
196        publicvoidrun()
197        {
198            //可以通过_stop参数来flush数据并停止线程
199            while(!_stop)
200            {
201                //暂停,可以通过StreamDataProvider中相应的方法来暂停该取数据进程的执行 。
202                synchronized(this)
203                {
204                    while(_paused &&!_stop)
205                    {
206                        try{
207                            this.wait();
208                        } catch(InterruptedException e) {
209                            continue;
210                        }
211                    }
212                }
213                if(!_stop)
214                {
215                    DataEvent<V>data =_dataProvider.next();    //调用_dataProvider.next()取得数据,然后现在自己的list中缓存起来
216                    if(data!=null)
217                    {
218                      synchronized(this)
219                      {
220                        _batch.add(data);
221                        if(_batch.size()>=_dataProvider._batchSize)    //达到批量处理数量缓存起来
222                        {
223                            flush();
224                        }
225                        _currentVersion=Math.max(_currentVersion, data.getVersion());
226                        this.notifyAll();
227                      }
228                    }
229                    else
230                    {
231                      synchronized(this)
232                      {
233                        flush();
234                        _stop=true;
235                        return;
236                      }
237                    }
238                }
239            }
240        }
241    }
242}