private[spark]traitListenerBus[L<:AnyRef, E]extendsLogging{// Marked `private[spark]` for access in tests.private[spark]vallisteners=newCopyOnWriteArrayList[L]finaldefaddListener(listener:L):Unit={listeners.add(listener)}finaldefremoveListener(listener:L):Unit={listeners.remove(listener)}/** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. */finaldefpostToAll(event:E):Unit={// JavaConverters can create a JIterableWrapper if we use asScala.// However, this method will be called frequently. To avoid the wrapper cost, here we use// Java Iterator directly.valiter=listeners.iteratorwhile(iter.hasNext){vallistener=iter.next()try{doPostEvent(listener,event)}catch{caseNonFatal(e)=>logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception",e)}}}/** * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same * thread for all listeners. */protecteddefdoPostEvent(listener:L,event:E):Unitprivate[spark]deffindListenersByClass[T<:L:ClassTag]():Seq[T]={valc=implicitly[ClassTag[T]].runtimeClasslisteners.asScala.filter(_.getClass==c).map(_.asInstanceOf[T]).toSeq}}
// Cap the capacity of the event queue so we get an explicit error (rather than// an OOM exception) if it's perpetually being added to more quickly than it's being drained.privatelazyvalEVENT_QUEUE_CAPACITY=validateAndGetQueueSize()privatelazyvaleventQueue=newLinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
privatevallistenerThread=newThread(name){// <-- 线程名为SparkListenerBussetDaemon(true)overridedefrun():Unit=Utils.tryOrStopSparkContext(sparkContext){LiveListenerBus.withinListenerThread.withValue(true){while(true){eventLock.acquire()self.synchronized{processingEvent=true}try{valevent=eventQueue.pollif(event==null){// Get out of the while loop and shutdown the daemon threadif(!stopped.get){thrownewIllegalStateException("Polling `null` from eventQueue means"+" the listener bus has been stopped. So `stopped` must be true")}return}postToAll(event)}finally{self.synchronized{processingEvent=false}}}}}}
defpost(event:SparkListenerEvent):Unit={if(stopped.get){// Drop further events to make `listenerThread` exit ASAPlogError(s"$name has already stopped! Dropping event $event")return}valeventAdded=eventQueue.offer(event)// <-- 这里将新来的事件添加到消息队列中if(eventAdded){eventLock.release()}else{onDropEvent(event)// <-- 没有添加成功,则丢弃事件droppedEventsCounter.incrementAndGet()}valdroppedEvents=droppedEventsCounter.getif(droppedEvents>0){// Don't log too frequentlyif(System.currentTimeMillis()-lastReportTimestamp>=60*1000){// There may be multiple threads trying to decrease droppedEventsCounter.// Use "compareAndSet" to make sure only one thread can win.// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and// then that thread will update it.if(droppedEventsCounter.compareAndSet(droppedEvents,0)){valprevLastReportTimestamp=lastReportTimestamplastReportTimestamp=System.currentTimeMillis()logWarning(s"Dropped $droppedEvents SparkListenerEvents since "+newjava.util.Date(prevLastReportTimestamp))}}}}
// An asynchronous listener bus for Spark eventsprivate[spark]vallistenerBus=newLiveListenerBus(this)随后创建各种listener,并注册到listenerBus中,通过调用listenerBus的start()方法启动消息分发流程。privatedefsetupAndStartListenerBus():Unit={// Use reflection to instantiate listeners specified via `spark.extraListeners`try{vallistenerClassNames:Seq[String]=conf.get("spark.extraListeners","").split(',').map(_.trim).filter(_!="")for(className<-listenerClassNames){// <-- 如果指定了额外的SparkListenr类,可通过反射机制实例化并注册到listenerBus// Use reflection to find the right constructorvalconstructors={vallistenerClass=Utils.classForName(className)listenerClass.getConstructors.asInstanceOf[Array[Constructor[_<:SparkListenerInterface]]]}valconstructorTakingSparkConf=constructors.find{c=>c.getParameterTypes.sameElements(Array(classOf[SparkConf]))}lazyvalzeroArgumentConstructor=constructors.find{c=>c.getParameterTypes.isEmpty}vallistener:SparkListenerInterface={if(constructorTakingSparkConf.isDefined){constructorTakingSparkConf.get.newInstance(conf)}elseif(zeroArgumentConstructor.isDefined){zeroArgumentConstructor.get.newInstance()}else{...}}listenerBus.addListener(listener)logInfo(s"Registered listener $className")}}catch{...}listenerBus.start()_listenerBusStarted=true}
defstop():Unit={if(!started.get()){thrownewIllegalStateException(s"Attempted to stop $name that has not yet started!")}if(stopped.compareAndSet(false,true)){// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know// `stop` is called.eventLock.release()listenerThread.join()}else{// Keep quiet}}