Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext.DRIVER_IDENTIFIER
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.PATH
Expand All @@ -45,7 +44,7 @@ private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) ext
private def getAppId: Option[String] = conf.getOption("spark.app.id")
private def getAttemptId: Option[String] = conf.getOption("spark.app.attempt.id")

private val profileFile = if (executorId == DRIVER_IDENTIFIER) {
private val profileFile = if (SparkContext.isDriver(executorId)) {
s"profile-$executorId.jfr"
} else {
s"profile-exec-$executorId.jfr"
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
if (SparkContext.isDriver(executorId)) {
Some(Utils.getThreadDump())
} else {
env.blockManager.master.getExecutorEndpointRef(executorId) match {
Expand Down Expand Up @@ -786,7 +786,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
if (SparkContext.isDriver(executorId)) {
Some(Utils.getHeapHistogram())
} else {
env.blockManager.master.getExecutorEndpointRef(executorId) match {
Expand Down Expand Up @@ -3163,6 +3163,11 @@ object SparkContext extends Logging {
/** Separator of tags in SPARK_JOB_TAGS property */
private[spark] val SPARK_JOB_TAGS_SEP = ","

/** Returns true if the given executor ID identifies the driver. */
private[spark] def isDriver(executorId: String): Boolean = {
DRIVER_IDENTIFIER == executorId
}

// Same rules apply to Spark Connect execution tags, see ExecuteHolder.throwIfInvalidTag
private[spark] def throwIfInvalidTag(tag: String) = {
if (tag == null) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SparkEnv (
Preconditions.checkState(null == _shuffleManager,
"Shuffle manager already initialized to %s", _shuffleManager)
try {
_shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER)
_shuffleManager = ShuffleManager.create(conf, SparkContext.isDriver(executorId))
} finally {
// Signal that the ShuffleManager has been initialized
shuffleManagerInitLatch.countDown()
Expand Down Expand Up @@ -356,7 +356,7 @@ object SparkEnv extends Logging {
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
val isDriver = SparkContext.isDriver(executorId)

// Listener bus is only used on the driver
if (isDriver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private class SharedMessageLoop(
.getOrElse(math.max(2, availableCores))

conf.get(EXECUTOR_ID).map { id =>
val role = if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
val role = if (SparkContext.isDriver(id)) "driver" else "executor"
conf.getInt(s"spark.$role.rpc.netty.dispatcher.numThreads", modNumThreads)
}.getOrElse(modNumThreads)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[netty] class NettyRpcEnv(
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
val role = conf.get(EXECUTOR_ID).map { id =>
if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
if (SparkContext.isDriver(id)) "driver" else "executor"
}

private[netty] val transportConf = SparkTransportConf.fromSparkConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private[spark] class EventLoggingListener(

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
if (event.execId == SparkContext.DRIVER_IDENTIFIER) {
if (SparkContext.isDriver(event.execId)) {
logEvent(event)
}
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ private[spark] object ShuffleBlockPusher {
private val BLOCK_PUSHER_POOL: ExecutorService = {
val conf = SparkEnv.get.conf
if (Utils.isPushBasedShuffleEnabled(conf,
isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)) {
isDriver = SparkContext.isDriver(SparkEnv.get.executorId))) {
val numThreads = conf.get(SHUFFLE_NUM_PUSH_THREADS)
.getOrElse(conf.getInt(SparkLauncher.EXECUTOR_CORES, 1))
ThreadUtils.newDaemonFixedThreadPool(numThreads, "shuffle-block-push-thread")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ private[spark] class AppStatusListener(
// Implicitly exclude every available executor for the stage associated with this node
Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
val executorIds = liveExecutors.values.filter(exec => exec.host == hostId
&& exec.executorId != SparkContext.DRIVER_IDENTIFIER).map(_.executorId).toSeq
&& !SparkContext.isDriver(exec.executorId)).map(_.executorId).toSeq
setStageExcludedStatus(stage, now, executorIds: _*)
}
liveExecutors.values.filter(exec => exec.hostname == hostId
&& exec.executorId != SparkContext.DRIVER_IDENTIFIER).foreach { exec =>
&& !SparkContext.isDriver(exec.executorId)).foreach { exec =>
addExcludedStageTo(exec, stageId, now)
}
}
Expand Down Expand Up @@ -413,7 +413,7 @@ private[spark] class AppStatusListener(

// Implicitly (un)exclude every executor associated with the node.
liveExecutors.values.foreach { exec =>
if (exec.hostname == host && exec.executorId != SparkContext.DRIVER_IDENTIFIER) {
if (exec.hostname == host && !SparkContext.isDriver(exec.executorId)) {
updateExecExclusionStatus(exec, excluded, now)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[spark] class AppStatusStore(
}

private def replaceExec(origin: v1.ExecutorSummary): v1.ExecutorSummary = {
if (origin.id == SparkContext.DRIVER_IDENTIFIER) {
if (SparkContext.isDriver(origin.id)) {
replaceDriverGcTime(origin, extractGcTime(origin), extractAppTime)
} else {
origin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
}

private def checkExecutorId(execId: String): Unit = {
if (execId != SparkContext.DRIVER_IDENTIFIER && !execId.forall(Character.isDigit)) {
if (!SparkContext.isDriver(execId) && !execId.forall(Character.isDigit)) {
throw new BadParameterException(
s"Invalid executorId: neither '${SparkContext.DRIVER_IDENTIFIER}' nor number.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private[spark] class BlockManager(

// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined
private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
private val isDriver = SparkContext.isDriver(executorId)

private val remoteReadNioBufferConversion =
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BlockManagerId private (
def topologyInfo: Option[String] = topologyInfo_

def isDriver: Boolean = {
executorId == SparkContext.DRIVER_IDENTIFIER
SparkContext.isDriver(executorId)
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class HeartbeatReceiverSuite
// We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
// so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
filter { case (k, _) => k != SparkContext.DRIVER_IDENTIFIER }
filter { case (k, _) => !SparkContext.isDriver(k) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val listener = new SparkListener {
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) {
if (!SparkContext.isDriver(executorMetricsUpdate.execId)) {
runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
events.foreach { event =>
event match {
case metricsUpdate: SparkListenerExecutorMetricsUpdate
if metricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER =>
if !SparkContext.isDriver(metricsUpdate.execId) =>
case stageCompleted: SparkListenerStageCompleted =>
val execIds = Set[String]()
(1 to 3).foreach { _ =>
Expand Down Expand Up @@ -631,7 +631,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
case (expected: SparkListenerExecutorMetricsUpdate,
actual: SparkListenerExecutorMetricsUpdate) =>
assert(expected.execId == actual.execId)
assert(expected.execId == SparkContext.DRIVER_IDENTIFIER)
assert(SparkContext.isDriver(expected.execId))
case (expected: SparkListenerEvent, actual: SparkListenerEvent) =>
assert(expected === actual)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
val executorId = executorMetricsUpdate.execId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
if (!SparkContext.isDriver(executorId)) {
val validUpdate = executorMetricsUpdate
.accumUpdates
.flatMap(_._4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
: Option[String] = {
val listWithoutDriver = list
.filterNot(_.id.equals(SparkContext.DRIVER_IDENTIFIER))
.filterNot(e => SparkContext.isDriver(e.id))
.filter(_.totalTasks >= minTasks)
val sortedList = policy match {
case ExecutorRollPolicy.ID =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ object StateStoreProvider extends Logging {
private[state] def coordinatorRef: Option[StateStoreCoordinatorRef] = synchronized {
val env = SparkEnv.get
if (env != null) {
val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER
val isDriver = SparkContext.isDriver(env.executorId)
// If running locally, then the coordinator reference in stateStoreCoordinatorRef may have
// become inactive as SparkContext + SparkEnv may have been restarted. Hence, when running in
// driver, always recreate the reference.
Expand Down Expand Up @@ -1765,8 +1765,7 @@ object StateStore extends Logging {
private def coordinatorRef: Option[StateStoreCoordinatorRef] = loadedProviders.synchronized {
val env = SparkEnv.get
if (env != null) {
val isDriver =
env.executorId == SparkContext.DRIVER_IDENTIFIER
val isDriver = SparkContext.isDriver(env.executorId)
// If running locally, then the coordinator reference in _coordRef may be have become inactive
// as SparkContext + SparkEnv may have been restarted. Hence, when running in driver,
// always recreate the reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.streaming.util

import org.apache.spark.SparkContext.DRIVER_IDENTIFIER
import org.apache.spark.SparkEnv
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.RpcUtils
Expand Down Expand Up @@ -73,8 +72,7 @@ class GlobalManualClock(endpointName: String)

private def isDriver: Boolean = {
val executorId = SparkEnv.get.executorId
// Check for null to match the behavior of executorId == DRIVER_IDENTIFIER
executorId != null && executorId.startsWith(DRIVER_IDENTIFIER)
SparkContext.isDriver(executorId)
}

override def getTimeMillis(): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
} else {
ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
!SparkContext.isDriver(blockManagerId.executorId) // Ignore the driver location
}.map { case (blockManagerId, _) =>
ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
}.toSeq
Expand Down