文档章节

Spark2.x Master源码分析

 白手套
发布于 2016/11/29 17:27
字数 3105
阅读 213
收藏 1

Spark2.x中 Master是一个进程同时也是一个消息通信端对象、消息循环体、消息通信体

Master进城的启动是在启动spark-master.sh脚本的时候而启动的,脚本里面定义了要启动的class类、端口、ip等信息,Spark2.x中启动master时候对应的类是class = org.apache.spark.deply.master.Master,如果不指定端口和ip的情况下 默认 spark_master_host =当前ip,spark_master_port=7077,spark_master_webui_port=8080

运行./spark-master.sh就会运行org.apache.spark.deply.master.Master的def main方法

def main(argStrings: Array[String]) {
  Utils.initDaemon(log)
 
val conf = new SparkConf
 
val args = new MasterArguments(argStrings, conf)
 
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
  rpcEnv.awaitTermination()
}

第一行  Utils.initDaemon(log) 就不用看了肯定是和日志有关的

第二行是我们经常写程序的时候是使用的val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)得到一些参数

进去看一下

MasterArguments类里面并没有多少代码无外乎是一些参数之间的解析与设置,比如
8080端口 7077端口 host得到本地当前主机的ip,以及加载spar-default.conf文件
rpcEnv.awaitTermination()//是等待RpcEndPoint退出
最重要的就是
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

def startRpcEnvAndEndpoint(

    host: String,

    port: Int,

    webUiPort: Int,

    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {

  val securityMgr = new SecurityManager(conf)

  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

  val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)

  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

}
此方法最重要的就是创建RpcEnv、注册Master(RpcEndPoint)、自己向自己发送消息(askWithRetry发送消息之后等待返回,如果在一段时间内没有返回那么就进行一定次数的重试)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
通过RpcEnv.create创建RpcEnv环境,传入的参数是RpcEndpoint的name sparkMaster,master主机名 Hadoop1,端口 7077
def create(

    name: String,

    host: String,

    port: Int,

    conf: SparkConf,

    securityManager: SecurityManager,

    clientMode: Boolean = false): RpcEnv = {

  val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)

  new NettyRpcEnvFactory().create(config)

}
create内部会构建一个case class类 参数是conf->Sparkconf,name->sparkMaster,host->Hadoop1,port->7077
然后调用NettyRpcFactory.create(RpcEnvConfig)
 
def create(config: RpcEnvConfig): RpcEnv = {

  val sparkConf = config.conf
   val javaSerializerInstance =

    new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]

  val nettyEnv =

    new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)

  if (!config.clientMode) {

    val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>

      nettyEnv.startServer(actualPort)

      (nettyEnv, nettyEnv.address.port)

    }

    try {

      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1

    } catch {

      case NonFatal(e) =>

        nettyEnv.shutdown()

        throw e

    }

  }

  nettyEnv

}
使用JavaSerializer序列化是安全的,在NettyRpcEnv里面通过JavaSerializer进行序列化和反序列化
 
Netty.startServer(),内部就是和netty底层有关的东西了,说白了就是启动Rpc Server
def startServer(port: Int): Unit = {

  val bootstraps: java.util.List[TransportServerBootstrap] =

    if (securityManager.isAuthenticationEnabled()) {

      java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))

    } else {

      java.util.Collections.emptyList()

    }

  server = transportContext.createServer(host, port, bootstraps)

  dispatcher.registerRpcEndpoint(

    RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

}
transportContext 实在new NettyRpcEnv的时候内部属性,是负责管理网络传输的上下问信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer
TransportServer配置并启动一个RPC Server服务
TransportServer 通过new 创建对象的时候 会调用里面的init方法传入ip和端口,最后大致就是将ip和端口绑定到ServerBootstrap 返回ChannelFuture
这里就创建了一个RpcServer就相当于启动了Master 因为ip和port都是Master里面的参数,也就是说Master就是Rpc的Server端
 
 
NettyRpcEnv.startServer(port)
这个方法就是启动RpcServer底层是netty
实例化创建NettyRpcEnv的时候会默认创建一些对象实例
private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))
TransportContext是Sparke2.x中负责网络传输的上下文对象,内部会有如下对象属性
RpcHandler//是spark中负责IO事件,接受RpcEndpointRef发送的消息,并通过dispatcher派发消息,具体实现是NettyRpcHandler
 
MessageEncoder//发送消息的编码
MessageDecoder//发送消息的解码
TransportClientFactory//TransportContext内部会调用方法创建TransportClientFactory对象,此对象最重要的功能就是产生TransportClient客户段,transportClient客户端是 
RpcEndPointRef向RpcEndPoint发送消息的时候需要用到的客户端
TransportServer//TransportContext内部通过port,RpcHandler创建RpcServer 并且启动
server = new TransportServer()创建实例化的时候内部会有一个init方法,就是通过netty创建一个RpcServer并启动
 
 private void init(String hostToBind, int portToBind) {
        IOMode ioMode = IOMode.valueOf(this.conf.ioMode());
         //创建处理IO的多线程事件环,用于接受请求的数据,和处理接受并注册给WorkerGroup的连接中的消息
        EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, this.conf.serverThreads(), "shuffle-server");
        PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(this.conf.preferDirectBufs(), true, this.conf.serverThreads());
         //bootStrap一个netty程序通常由一个Bootstrap开始,主要作用是配置整个netty程序,串联各个控件
        this.bootstrap = ((ServerBootstrap)((ServerBootstrap)(new ServerBootstrap()).group(bossGroup, bossGroup).channel(NettyUtils.getServerChannelClass 
(ioMode))).option(ChannelOption.ALLOCATOR, allocator)).childOption(ChannelOption.ALLOCATOR, allocator);
        if(this.conf.backLog() > 0) {
            this.bootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.conf.backLog()));
        }
 
        if(this.conf.receiveBuf() > 0) {
            this.bootstrap.childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.conf.receiveBuf()));
        }
 
        if(this.conf.sendBuf() > 0) {
            this.bootstrap.childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.conf.sendBuf()));
        } 
         //设置Handler,Handler主要是用来处理各种事件,比如连接、数据接收,因为当一个连接过来我们知道如何处理数据以及发送数据,这个时候就需要用到我们自己定义的 
Handler,Spark中定义的Handler是TransportChannelHandler里面定义了一些读取client发送的消息并且处理等方法,而ChannelInitializer就是配置我们自己定义的Handler的,并把
Handler加入到ChannelPipeline,ChannelPipeline,可以看作一个消息流转的通道,在这个通道上面附带很多处理消息的Handler,当这个消息在通道中流转的时候如果有和这个消息相匹配的 
Handler,就会出发Handler去执行响应的操作
        this.bootstrap.childHandler(new ChannelInitializer() {
            protected void initChannel(SocketChannel ch) throws Exception {
                RpcHandler rpcHandler = TransportServer.this.appRpcHandler;
 
                TransportServerBootstrap bootstrap;
                for(Iterator i$ = TransportServer.this.bootstraps.iterator(); i$.hasNext(); rpcHandler = bootstrap.doBootstrap(ch, rpcHandler)) {
                    bootstrap = (TransportServerBootstrap)i$.next();
                }
 
                TransportServer.this.context.initializePipeline(ch, rpcHandler);
            }
        });
        InetSocketAddress address = hostToBind == null?new InetSocketAddress(portToBind):new InetSocketAddress(hostToBind, portToBind);
         //绑定对应ip的端口启动Server
        this.channelFuture = this.bootstrap.bind(address);
        this.channelFuture.syncUninterruptibly();
        this.port = ((InetSocketAddress)this.channelFuture.channel().localAddress()).getPort();
        this.logger.debug("Shuffle server started on port :" + this.port);
    }
上面就是对Spark2.x底层使用Netty的源码分析
 
下面继续看netty的通信代码在TransPortServer代码中调用了initializePipeLine方法
TransportContext. initializePipeline()方法
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler) {

    try {

        TransportChannelHandler e = this.createChannelHandler(channel, channelRpcHandler);

        channel.pipeline().addLast("encoder", this.encoder).addLast("frameDecoder", NettyUtils.createFrameDecoder()).addLast("decoder", this.decoder).addLast("idleStateHandler", new IdleStateHandler(0, 0, this.conf.connectionTimeoutMs() / 1000)).addLast("handler", e);

        return e;

    } catch (RuntimeException var4) {

        this.logger.error("Error while initializing Netty pipeline", var4);

        throw var4;

    }

}
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {

    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);

    TransportClient client = new TransportClient(channel, responseHandler);

    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler);

    return new TransportChannelHandler(client, responseHandler, requestHandler, (long)this.conf.connectionTimeoutMs(), this.closeIdleConnections);

}
上面的代码就是为了创建TransportChannelHandler
TransportChannelHandler 代码中有channelRead0
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {

    if(request instanceof RequestMessage) {

        this.requestHandler.handle((RequestMessage)request);

    } else {

        this.responseHandler.handle((ResponseMessage)request);

    }



}
从channelRead0方法可以看出Netty收到的数据经过Decode之后交由requestHandler处理,继续看handle()方法
public void handle(RequestMessage request) {

    if(request instanceof ChunkFetchRequest) {

        this.processFetchRequest((ChunkFetchRequest)request);

    } else if(request instanceof RpcRequest) {

        this.processRpcRequest((RpcRequest)request);

    } else if(request instanceof OneWayMessage) {

        this.processOneWayMessage((OneWayMessage)request);

    } else {

        if(!(request instanceof StreamRequest)) {

            throw new IllegalArgumentException("Unknown request type: " + request);

        }



        this.processStreamRequest((StreamRequest)request);

    }



}
private void processStreamRequest(StreamRequest req) {

    String client = NettyUtils.getRemoteAddress(this.channel);



    ManagedBuffer buf;

    try {

        buf = this.streamManager.openStream(req.streamId);

    } catch (Exception var5) {

        this.logger.error(String.format("Error opening stream %s for request from %s", new Object[]{req.streamId, client}), var5);

        this.respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(var5)));

        return;

    }



    if(buf != null) {

        this.respond(new StreamResponse(req.streamId, buf.size(), buf));

    } else {

        this.respond(new StreamFailure(req.streamId, String.format("Stream \'%s\' was not found.", new Object[]{req.streamId})));

    }



}
processRpcRequest
 
 
private void processRpcRequest(final RpcRequest req) {

    try {

        this.rpcHandler.receive(this.reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {

            public void onSuccess(ByteBuffer response) {

                TransportRequestHandler.this.respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));

            }



            public void onFailure(Throwable e) {

                TransportRequestHandler.this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));

            }

        });

    } catch (Exception var6) {

        this.logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, var6);

        this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(var6)));

    } finally {

        req.body().release();

    }



}
最后是交给了NettyRpcHandler的receive方法
override def receive(

    client: TransportClient,

    message: ByteBuffer,

    callback: RpcResponseCallback): Unit = {

  val messageToDispatch = internalReceive(client, message)

  dispatcher.postRemoteMessage(messageToDispatch, callback)

}
然后调用dispatcher.postRemoteMessage()
/** Posts a message sent by a remote endpoint. */ def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {

  val rpcCallContext =

    new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)

  val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)

  postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))

}
最后还是放到Inbox的message inbox.post(message)队列里面和rpcEnv里面通过线程池调用MessageLoop现成调用inbox.press处理消息
private def postMessage(

    endpointName: String,

    message: InboxMessage,

    callbackIfStopped: (Exception) => Unit): Unit = {

  val error = synchronized {

    val data = endpoints.get(endpointName)

    if (stopped) {

      Some(new RpcEnvStoppedException())

    } else if (data == null) {

      Some(new SparkException(s"Could not find $endpointName."))

    } else {

      data.inbox.post(message)

      receivers.offer(data)

      None

    }

  }
 
 

此时Netty请求处理的调用流程:

TransportChannelHandler --> TransportRequestHandler --> RpcHandler [NettyRpcHandler] --> Dispatcher --> 消息队列

RpcEnv.create()以上就是create以及netty处理流程的源码分析
 
接下里就是
RpcEndPoint注册了
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

  new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
调用setupEndpoint方法进行注册传递的参数是EndPoint_name->master,以及new Master,也就是现在才开始创建Master,之前哪些监听都是基于ip和端口的
new Spark无外乎是做一些属性初始化,比如 创建Map结构的driver、创建Map结构的Worker、创建Map结构的Application,因为driver、worker、application都会向master注册。其他一些端口或者ip、persistenceEngine持久化引擎等
继续看setupEndpoint方法,其内部调用的是NettyRpcEnv的dispatcher的registerSetupEndpoint()
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {

  val addr = RpcEndpointAddress(nettyEnv.address, name)

  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)

  synchronized {

    if (stopped) {

      throw new IllegalStateException("RpcEnv has been stopped")

    }

    if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {

      throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")

    }

    val data = endpoints.get(name)

    endpointRefs.put(data.endpoint, data.ref)

    receivers.offer(data)  // for the OnStart message   }

  endpointRef

}
 
首先构建RpcEndpointAddress 里面存放的时候Master的Ip和和端口,以及RpcEndpoint的名字也就是spark,然后根据RpcEndpointAddress、sparkconf、spark的NettyRpcEnv对象构建RpcEndpointRef对象
然后根据RpcEndpoint的name->spark,RpcEndpoint->master,RpeEndpointRef构建EndpointData,构建EndpointData实例的时候会创建一个Inbox,并存放到private val endpoints = new ConcurrentHashMap[String, EndpointData]里面,然后将RpcEndPoint,RpcEndPointRef存放到private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
最后将EndPointData offer到private val receivers = new LinkedBlockingQueue[EndpointData]里面
接下来就是消费receivers队列里面的数据了
NettyRpcEnv里面有一个线程池来启动MessageLoop现成,messageLoop现成里面的实现是从receivers拿到endpointData,然后调用endpointData.inbox.process()来处理inbox内部队列里面的数据,默认Inbox在实例化的时候会把onStart加入到message队列,所以就会调用RpcEndPoint的onStart方法
private val threadpool: ThreadPoolExecutor = {

  val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",

    math.max(2, Runtime.getRuntime.availableProcessors()))

  val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")

  for (i <- 0 until numThreads) {

    pool.execute(new MessageLoop)

  }

  pool

}
 
private class MessageLoop extends Runnable {

  override def run(): Unit = {

    try {

      while (true) {

        try {

          val data = receivers.take()

          if (data == PoisonPill) {

            // Put PoisonPill back so that other MessageLoops can see it.             receivers.offer(PoisonPill)

            return           }

          data.inbox.process(Dispatcher.this)

        } catch {

          case NonFatal(e) => logError(e.getMessage, e)

        }

      }

    } catch {

      case ie: InterruptedException => // exit     }

  }

}
上面的方法就是一个典型的生产者消费者
接下来看onStart方法
 
override def onStart(): Unit = {

  logInfo("Starting Spark master at " + masterUrl)

  logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

  webUi = new MasterWebUI(this, webUiPort)

  webUi.bind()

  masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort

  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {

    override def run(): Unit = Utils.tryLogNonFatalError {

      self.send(CheckForWorkerTimeOut)

    }

  }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)



  if (restServerEnabled) {

    val port = conf.getInt("spark.master.rest.port", 6066)

    restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))

  }

  restServerBoundPort = restServer.map(_.start())



  masterMetricsSystem.registerSource(masterSource)

  masterMetricsSystem.start()

  applicationMetricsSystem.start()

  // Attach the master and app metrics servlet handler to the web ui after the metrics systems are   // started.   masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

  applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)



  val serializer = new JavaSerializer(conf)

  val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {

    case "ZOOKEEPER" =>

      logInfo("Persisting recovery state to ZooKeeper")

      val zkFactory =

        new ZooKeeperRecoveryModeFactory(conf, serializer)

      (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))

    case "FILESYSTEM" =>

      val fsFactory =

        new FileSystemRecoveryModeFactory(conf, serializer)

      (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))

    case "CUSTOM" =>

      val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))

      val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])

        .newInstance(conf, serializer)

        .asInstanceOf[StandaloneRecoveryModeFactory]

      (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))

    case _ =>

      (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))

  }

  persistenceEngine = persistenceEngine_

  leaderElectionAgent = leaderElectionAgent_

}
onstart主要是启动webui等监听服务,最重要的是zookeeper的HA
spark的Ha方式有基于zk、file、custom、默认(也就是一个节点)
zk模式下默认将worker、driver、app序列化成zk的节点,如果active宕机,那么standby就会从zk节点上面读取数据来保证信息不丢失
val zkFactory =

  new ZooKeeperRecoveryModeFactory(conf, serializer)

(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
zkFactory.createLeaderElectionAgent(this)选择主的过程以及Ha切换过程
这部分逻辑主要在ZooKeeperLeaderElectionAgent类中,在初始化过程中调用了start()方法
 
def createPersistenceEngine(): PersistenceEngine = {

  new ZooKeeperPersistenceEngine(conf, serializer)

}
zkFactory.createPersistenceEngine()//首先创建持久化引擎对象,创建目录
 
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {

  new ZooKeeperLeaderElectionAgent(master, conf)

}
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
//创建目录
调用start()方法
private def start() {

  logInfo("Starting ZooKeeper LeaderElection agent")

  zk = SparkCuratorUtil.newClient(conf)

  leaderLatch = new LeaderLatch(zk, WORKING_DIR)

  leaderLatch.addListener(this)

  leaderLatch.start()

}
public void start() throws Exception {

    Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");

    this.client.getConnectionStateListenable().addListener(this.listener);

    this.reset();

}
@VisibleForTesting

void reset() throws Exception {

    this.setLeadership(false);

    this.setNode((String)null);

    BackgroundCallback callback = new BackgroundCallback() {

        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

            if(LeaderLatch.this.debugResetWaitLatch != null) {

                LeaderLatch.this.debugResetWaitLatch.await();

                LeaderLatch.this.debugResetWaitLatch = null;

            }



            if(event.getResultCode() == Code.OK.intValue()) {

                LeaderLatch.this.setNode(event.getName());

                if(LeaderLatch.this.state.get() == LeaderLatch.State.CLOSED) {

                    LeaderLatch.this.setNode((String)null);

                } else {

                    LeaderLatch.this.getChildren();

                }

            } else {

                LeaderLatch.this.log.error("getChildren() failed. rc = " + event.getResultCode());

            }



        }

    };

    ((PathAndBytesable)((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).inBackground(callback)).forPath(ZKPaths.makePath(this.latchPath, "latch-"), LeaderSelector.getIdBytes(this.id));

}
随后,获取所有的children,并对children进行排序,如果当前Master创建的节点排在第一位,则说明当前Master为Active,否则就是Watch优先级比当前节点更高的那个节点,如果那个节点发生了变化,则reset,再次尝试选主。
当active发生变化的时候会触发
override def electedLeader() {

  self.send(ElectedLeader)

}
此时master的receive会处理
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)

state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {

  RecoveryState.ALIVE } else {

  RecoveryState.RECOVERING }
读取持久化到zk的信息然后将standby变成active
以上就是基于Ha的高可靠
最后就是Master自己给自己发送一个BoundPortsRequest的消息
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
此消息会在一定时间内没有返回结果会再次请求
因为是本地消息所以直接放到EndpointData的inbox直接派发给Master的receiveAndReply处理
case BoundPortsRequest =>

  context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
然后直接回复给自己三个端口号
以上就是spark2.xmaster的启动源码分析

 

© 著作权归作者所有

粉丝 0
博文 2
码字总数 4834
作品 0
浦东
程序员
私信 提问
系统的学习大数据分布式计算spark技术

我们在学习一门技术的时候一定要以系统的思维去学习,这样的话,不仅对你的提高有很大的帮助,也可以让你高效的使用这个技术。 对于学习spark,当然也是要以系统的思维去全面的学习。这篇博客...

tangweiqun
2017/09/24
0
0
Spark UDF使用详解及代码示例

我的原创地址:https://dongkelun.com/2018/08/02/sparkUDF/ 前言 本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(S...

董可伦
2018/08/09
0
0
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
Spark2.x自定义累加器AccumulatorV2的使用教程

Spark2.x自定义累加器AccumulatorV2的使用教程 废除 Spark2.x之后,之前的的accumulator被废除,用AccumulatorV2代替; 更新增加 创建并注册一个long accumulator, 从“0”开始,用“add”累...

leen0304的博客
2017/12/22
0
0
nginx源码分析—启动流程

作者:阿波 本文链接: http://blog.csdn.net/livelylittlefish/article/details/7243718 Content 0. 序 1. main()分析 2. 注意问题 2.1 几个初值 2.2 nginx工作模式 2.3 一些配置 2.4 其他开...

晨曦之光
2012/03/09
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

官方来源的 Duo Mobile App 解决了我的 Network Difficulties 问题

https://help.duo.com/s/article/2094?language=en_US 我利用百度搜索下载了一个 Duo Mobile App (由于 Google Play)在大陆不可用。 在扫描旧手机上的 Duo Mobile App 的二维码时, 显示出错...

圣洁之子
7分钟前
1
0
Zabbix监控Mysql容器(Docker容器)主从是否存活

1、在Zabbix Web端创建模板 2、为该模板创建监控项 3、创建触发器 4、在zabbix-agent端操作 在/etc/zabbix/zabbix_agentd.d新建customize.confw文件 内容如下 UserParameter=mysql.replicat...

abowu
8分钟前
1
0
基于 RocketMQ 的同城双活架构在美菜网的挑战与实践

本文整理自李样兵在北京站 RocketMQ meetup分享美菜网使用 RocketMQ 过程中的一些心得和经验,偏重于实践。 嘉宾李样兵,现就职于美菜网基础服务平台组,负责 MQ ,配置中心和任务调度等基础...

大涛学长
14分钟前
1
0
设计模式之:外观模式和桥接模式

作者:DevYK 链接:https://juejin.im/post/5d7e01f4f265da03b5747aac 外观模式 介绍 外观模式 (Facade) 在开发过程中的运用评率非常高,尤其是在现阶段,各种第三方 SDK “充斥” 在我们周边...

Java架构Monster
15分钟前
1
0
人证合一核验设备

人脸身份验证机,人证合一设备1:N如我们现在在车站或一些重要的场所如步行街、城中村等人流密集的场所应用的人脸识别布控系统,其特点是动态和非配合。所谓的动态也就是识别的不是照 片,不是...

非思丸智能
17分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部