diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java index 3f74d9444ac7..27e969999753 100644 --- a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java +++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java @@ -96,5 +96,10 @@ public String executionStatus() { public double progressedPercentage() { return response.getProgressedPercentage(); } + + @Override + public String queueMetrics() { + return response.isSetQueueMetrics() ? response.getQueueMetrics() : ""; + } } } diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java index fe55e7267d3e..42e8c55ea7ef 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java @@ -175,6 +175,12 @@ public void render(ProgressMonitor monitor) { reprintLine(SEPARATOR); reprintLineWithColorAsBold(footer, Ansi.Color.RED); reprintLine(SEPARATOR); + + // Display queue metrics if available (may be multi-line: queue name + metrics) + String queueMetrics = monitor.queueMetrics(); + if (queueMetrics != null && !queueMetrics.isEmpty()) { + reprintMultiLine(queueMetrics); + } } diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java index 67dd7ca02f1e..1781c32820c9 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java @@ -52,6 +52,11 @@ public String executionStatus() { public double progressedPercentage() { return 0; } + + @Override + public String queueMetrics() { + return ""; + } }; List headers(); @@ -65,4 +70,6 @@ public double progressedPercentage() { String executionStatus(); double progressedPercentage(); + + String queueMetrics(); } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31b5e32c2ddb..0a771c5a40ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3982,6 +3982,12 @@ public static enum ConfVars { HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", false, "Whether to check user access to explicitly specified YARN queues. " + "yarn.resourcemanager.webapp.address must be configured to use this."), + HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL("hive.tez.queue.metrics.refresh.interval", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Interval for refreshing YARN queue resource metrics during Tez query execution. " + + "When set to a positive value (e.g. 10s), displays real-time memory, vCore, capacity " + + "and application metrics for the YARN queue being used. " + + "Set to 0 or negative to disable. Minimum effective value is 1 second."), HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h", new TimeValidator(TimeUnit.HOURS), "The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java index 68844bd81728..e24d2bb0416a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGStatus; @@ -86,6 +87,7 @@ public String toString() { HiveConf getConf(); TezClient getTezClient(); + YarnClient getYarnClient(); boolean isOpen(); boolean isOpening(); boolean getDoAsEnabled(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 415072f221da..8b4a0410566d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGStatus; @@ -337,6 +338,11 @@ public TezClient getTezClient() { return baseSession.getTezClient(); } + @Override + public YarnClient getYarnClient() { + return baseSession.getYarnClient(); + } + @Override public boolean isOpening() { return baseSession.isOpening(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2924416ad480..8cd76f724c56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -73,6 +73,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; @@ -117,6 +118,7 @@ public class TezSessionState implements TezSession { Path tezScratchDir; protected LocalResource appJarLr; private TezClient session; + private YarnClient yarnClient; private Future sessionFuture; /** Console used for user feedback during async session opening. */ private LogHelper console; @@ -750,6 +752,17 @@ public void close(boolean keepDagFilesDir) throws Exception { closeClient(asyncSession); } } + + // Stop YarnClient if it was initialized + if (yarnClient != null) { + try { + LOG.info("Stopping YarnClient for session: {}", sessionId); + yarnClient.stop(); + yarnClient = null; + } catch (Exception e) { + LOG.warn("Error stopping YarnClient for session {}: {}", sessionId, e.getMessage()); + } + } } finally { try { cleanupScratchDir(); @@ -795,6 +808,20 @@ public String getSessionId() { protected final void setTezClient(TezClient session) { this.session = session; + + // Initialize YarnClient for queue metrics collection + if (session != null && yarnClient == null) { + try { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + LOG.info("YarnClient initialized for session: {}", sessionId); + } catch (Exception e) { + LOG.warn("Failed to initialize YarnClient for metrics collection: {}", e.getMessage()); + LOG.debug("Full exception for YarnClient initialization failure", e); + yarnClient = null; + } + } } @Override @@ -820,6 +847,11 @@ public TezClient getTezClient() { return session; } + @Override + public YarnClient getYarnClient() { + return yarnClient; + } + @Override public LocalResource getAppJarLr() { return appJarLr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java new file mode 100644 index 000000000000..171b1e610361 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Collects YARN queue resource metrics in the background using a scheduled executor. + * Provides thread-safe access to the latest metrics snapshot. + */ +public class YarnQueueMetricsCollector { + private static final Logger LOG = LoggerFactory.getLogger(YarnQueueMetricsCollector.class); + private static final Random RANDOM = new Random(); + + private final YarnClient yarnClient; + private final String queueName; + private final ScheduledExecutorService executorService; + private final AtomicReference snapshotRef; + private final AtomicBoolean isShutdown; + + // Circuit breaker for handling repeated failures + private int consecutiveFailures = 0; + private static final int MAX_CONSECUTIVE_FAILURES = 5; + private static final int BACKOFF_THRESHOLD = 3; + + /** + * Creates a new metrics collector that immediately begins collecting queue metrics. + * + * @param yarnClient The YarnClient to use for querying queue info + * @param queueName The queue name to monitor + * @param refreshIntervalMs How often to refresh metrics in milliseconds + * @param queryId The query ID for thread naming + * @throws IllegalArgumentException if yarnClient or queueName is null + */ + public YarnQueueMetricsCollector(YarnClient yarnClient, String queueName, + long refreshIntervalMs, String queryId) { + if (yarnClient == null) { + throw new IllegalArgumentException("YarnClient cannot be null"); + } + if (queueName == null) { + throw new IllegalArgumentException("Queue name cannot be null"); + } + + this.yarnClient = yarnClient; + this.queueName = queueName; + this.snapshotRef = new AtomicReference<>(null); + this.isShutdown = new AtomicBoolean(false); + + // Create named daemon thread for metrics collection + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("yarn-queue-metrics-collector-" + queryId) + .setDaemon(true) + .build() + ); + + try { + // Perform eager initial collection + collectMetrics(); + + // Add random jitter (0-20% of refresh interval) to prevent thundering herd: + // when many queries start simultaneously, they would otherwise all hit YARN RM + // at the same fixed intervals, causing load spikes. + long jitter = (long) (refreshIntervalMs * RANDOM.nextDouble() * 0.2); + long initialDelay = refreshIntervalMs + jitter; + + // Schedule periodic collection with jittered initial delay + executorService.scheduleWithFixedDelay( + () -> { + try { + collectMetrics(); + } catch (Exception e) { + LOG.error("Unexpected error in scheduled metrics collection for queue {}: {}", + queueName, e.getMessage(), e); + } + }, + initialDelay, + refreshIntervalMs, + TimeUnit.MILLISECONDS + ); + + LOG.info("Started YARN queue metrics collector for queue: {}, refresh interval: {}ms, initial delay: {}ms", + queueName, refreshIntervalMs, initialDelay); + } catch (RuntimeException e) { + // If initialization fails, clean up the executor to prevent a thread leak + LOG.error("Failed to initialize metrics collector for queue {}, shutting down executor", queueName, e); + executorService.shutdownNow(); + throw new RuntimeException("Failed to initialize YARN queue metrics collector", e); + } + } + + /** + * Checks if an exception is or was caused by an InterruptedException. + * + * @param e The exception to check + * @return true if the exception is an InterruptedException or has one as its cause + */ + private boolean isInterruptedException(Exception e) { + return e instanceof InterruptedException || e.getCause() instanceof InterruptedException; + } + + /** + * Collects queue metrics and updates the snapshot. + * Handles all exceptions gracefully by setting snapshot to null. + * Implements circuit breaker pattern to back off on repeated failures. + */ + private void collectMetrics() { + // Circuit breaker: Skip collection if too many consecutive failures + // This prevents hammering a struggling YARN ResourceManager + if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { + if (consecutiveFailures == MAX_CONSECUTIVE_FAILURES) { + LOG.warn("Queue metrics collection has failed {} times consecutively for queue {}. " + + "Temporarily reducing collection attempts to avoid overloading YARN RM. " + + "Will retry periodically.", MAX_CONSECUTIVE_FAILURES, queueName); + consecutiveFailures++; // Increment to avoid repeated logging + } + // Still attempt collection occasionally, but skip most attempts + if (RANDOM.nextDouble() > 0.1) { // Only try 10% of the time + return; + } + } + + try { + QueueInfo queueInfo = yarnClient.getQueueInfo(queueName); + if (queueInfo != null) { + QueueMetricsSnapshot snapshot = new QueueMetricsSnapshot(queueInfo); + snapshotRef.set(snapshot); + + // Success - reset circuit breaker + if (consecutiveFailures > 0) { + LOG.info("Queue metrics collection recovered for queue {} after {} failures", + queueName, consecutiveFailures); + consecutiveFailures = 0; + } + + LOG.debug("Collected queue metrics for {}: memory={}/{} GB, vCores={}/{}", + queueName, snapshot.memoryUsedGB, snapshot.memoryTotalGB, + snapshot.vCoresUsed, snapshot.vCoresTotal); + } else { + LOG.warn("QueueInfo is null for queue: {}", queueName); + consecutiveFailures++; + snapshotRef.set(null); + } + } catch (Exception e) { + if (isInterruptedException(e)) { + LOG.debug("Metrics collection interrupted for queue: {}", queueName); + Thread.currentThread().interrupt(); + // Don't increment failure counter or set snapshot to null, preserve last good state on interrupt + return; + } + + // Increment failure counter + consecutiveFailures++; + + // Log warnings for first few failures, then reduce logging frequency + if (consecutiveFailures <= BACKOFF_THRESHOLD) { + LOG.warn("Failed to collect queue metrics for queue {} (failure {} of {}): {}", + queueName, consecutiveFailures, MAX_CONSECUTIVE_FAILURES, e.getMessage()); + } else if (consecutiveFailures == MAX_CONSECUTIVE_FAILURES) { + LOG.warn("Queue metrics collection failing repeatedly for queue {} ({} consecutive failures). " + + "This may indicate YARN RM is under heavy load or unreachable.", + queueName, consecutiveFailures); + } + + LOG.debug("Full exception for queue metrics collection failure", e); + snapshotRef.set(null); + } + } + + /** + * Gets the latest metrics snapshot in a thread-safe manner. + * + * @return The latest snapshot, or null if collection failed or not yet completed + */ + public QueueMetricsSnapshot getLatestSnapshot() { + return snapshotRef.get(); + } + + /** + * Gets the queue name being monitored. + * + * @return The queue name + */ + public String getQueueName() { + return queueName; + } + + /** + * Shuts down the metrics collector gracefully. + * This method is idempotent and thread-safe. + */ + public synchronized void shutdown() { + if (isShutdown.getAndSet(true)) { + return; // Already shut down + } + + LOG.info("Shutting down YARN queue metrics collector for queue: {}", queueName); + + try { + executorService.shutdownNow(); + boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!terminated) { + LOG.warn("Metrics collector for queue {} did not terminate within timeout", queueName); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down metrics collector for queue: {}", queueName); + Thread.currentThread().interrupt(); + } + } + + /** + * Immutable snapshot of queue metrics at a point in time. + */ + public static final class QueueMetricsSnapshot { + private final float memoryUsedGB; + private final float memoryTotalGB; + private final int vCoresUsed; + private final int vCoresTotal; + private final float capacityPercentage; + private final int runningApps; + private final int pendingContainers; + private final long collectionTimestamp; + + /** + * Creates a snapshot from QueueInfo. + * + * @param queueInfo The queue info to extract metrics from + * @throws IllegalArgumentException if queueInfo is null + */ + public QueueMetricsSnapshot(QueueInfo queueInfo) { + if (queueInfo == null) { + throw new IllegalArgumentException("QueueInfo cannot be null"); + } + + this.collectionTimestamp = System.currentTimeMillis(); + + // Extract queue statistics with null-safe handling + QueueStatistics stats = queueInfo.getQueueStatistics(); + if (stats != null) { + // Convert memory from MB to GB. Total = Allocated + Available + this.memoryUsedGB = stats.getAllocatedMemoryMB() / 1024.0f; + this.memoryTotalGB = (stats.getAllocatedMemoryMB() + stats.getAvailableMemoryMB()) / 1024.0f; + this.vCoresUsed = (int) stats.getAllocatedVCores(); + this.vCoresTotal = (int) (stats.getAllocatedVCores() + stats.getAvailableVCores()); + this.runningApps = (int) stats.getNumAppsRunning(); + this.pendingContainers = (int) stats.getPendingContainers(); + } else { + LOG.debug("QueueStatistics is null for queue, using zero values"); + this.memoryUsedGB = 0; + this.memoryTotalGB = 0; + this.vCoresUsed = 0; + this.vCoresTotal = 0; + this.runningApps = 0; + this.pendingContainers = 0; + } + + // Get capacity percentage + this.capacityPercentage = queueInfo.getCapacity() * 100; + } + + public float getMemoryUsedGB() { + return memoryUsedGB; + } + + public float getMemoryTotalGB() { + return memoryTotalGB; + } + + public int getVCoresUsed() { + return vCoresUsed; + } + + public int getVCoresTotal() { + return vCoresTotal; + } + + public float getCapacityPercentage() { + return capacityPercentage; + } + + public int getRunningApps() { + return runningApps; + } + + public int getPendingContainers() { + return pendingContainers; + } + + public long getCollectionTimestamp() { + return collectionTimestamp; + } + + /** + * Gets the memory usage percentage as a formatted string. + * + * @return Formatted percentage like "50.25%" or "N/A" if total is zero + */ + public String getMemoryPercentage() { + if (memoryTotalGB > 0) { + return String.format("%.2f%%", (memoryUsedGB / memoryTotalGB) * 100); + } + return "N/A"; + } + + /** + * Gets the vCore usage percentage as a formatted string. + * + * @return Formatted percentage like "75.00%" or "N/A" if total is zero + */ + public String getVCoresPercentage() { + if (vCoresTotal > 0) { + return String.format("%.2f%%", ((float) vCoresUsed / vCoresTotal) * 100); + } + return "N/A"; + } + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 92844f4d5716..2146c2b9100c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.Utils; +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -119,6 +121,7 @@ public static void initShutdownHook() { private final RenderStrategy.UpdateFunction updateFunction; // compile time tez counters private final TezCounters counters; + private YarnQueueMetricsCollector metricsCollector; public TezJobMonitor(TezSession session, List topSortedWorks, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx, final TezCounters counters, PerfLogger perfLogger) { @@ -134,6 +137,9 @@ public TezJobMonitor(TezSession session, List topSortedWorks, final DA this.counters = counters; this.shouldCollectSummaryString = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_ENABLED) && conf.getBoolVar(ConfVars.HIVE_QUERY_HISTORY_EXEC_SUMMARY_ENABLED); + + // Initialize YARN queue metrics collector if enabled + this.metricsCollector = initializeMetricsCollector(); } private RenderStrategy.UpdateFunction updateFunction() { @@ -144,6 +150,53 @@ private RenderStrategy.UpdateFunction updateFunction() { : new RenderStrategy.LogToFileFunction(this, perfLogger); } + private YarnQueueMetricsCollector initializeMetricsCollector() { + // Get refresh interval - controls whether the feature is enabled. + // interval <= 0 means disabled (default is 0s = disabled). + long refreshInterval = HiveConf.getTimeVar(hiveConf, + ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, TimeUnit.MILLISECONDS); + + if (refreshInterval <= 0) { + LOG.debug("Queue metrics collection disabled (refresh interval: {}ms)", refreshInterval); + return null; + } + + try { + // Get YarnClient from session + YarnClient yarnClient = session.getYarnClient(); + if (yarnClient == null) { + LOG.warn("YarnClient not available, skipping queue metrics collection"); + return null; + } + + // Get queue name, default to "default" if not specified + String queueName = session.getQueueName(); + if (queueName == null || queueName.trim().isEmpty()) { + queueName = "default"; + LOG.info("Queue name not specified, using default queue"); + } + + // Validate minimum refresh interval (at least 1 second) + if (refreshInterval < 1000) { + LOG.warn("Queue metrics refresh interval {}ms is less than minimum 1000ms, using 1000ms", + refreshInterval); + refreshInterval = 1000; + } + + // Get query ID from DAG name + String queryId = dag.getName(); + + LOG.info("Initializing YARN queue metrics collector for queue: {}, refresh interval: {}ms", + queueName, refreshInterval); + + return new YarnQueueMetricsCollector(yarnClient, queueName, refreshInterval, queryId); + } catch (Exception e) { + LOG.warn("Unable to initialize YARN queue metrics collector: {}", e.getMessage()); + LOG.debug("Full exception for queue metrics initialization failure", e); + return null; + } + } + private boolean isProfilingEnabled() { return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(hiveConf); @@ -312,6 +365,16 @@ public int monitorExecution() { synchronized (shutdownList) { shutdownList.remove(dagClient); } + + // Shutdown metrics collector if it was initialized + if (metricsCollector != null) { + try { + metricsCollector.shutdown(); + } catch (Exception e) { + LOG.warn("Error shutting down queue metrics collector", e); + } + } + break; } } @@ -516,7 +579,7 @@ public String getDiagnostics() { ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { try { return new TezProgressMonitor(dagClient, status, topSortedWorks, progressMap, console, - executionStartTime); + executionStartTime, metricsCollector); } catch (IOException | TezException e) { console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " + ExceptionUtils.getStackTrace(e)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java index 735442d2d1c8..4957ac68ce1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.tez.dag.api.TezException; @@ -44,6 +46,7 @@ public class TezProgressMonitor implements ProgressMonitor { private final SessionState.LogHelper console; private final long executionStartTime; private final DAGStatus status; + private final YarnQueueMetricsCollector metricsCollector; Map vertexStatusMap = new HashMap<>(); Map progressCountsMap = new HashMap<>(); @@ -54,10 +57,18 @@ public class TezProgressMonitor implements ProgressMonitor { TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, Map progressMap, SessionState.LogHelper console, long executionStartTime) throws IOException, TezException { + this(dagClient, status, topSortedWork, progressMap, console, executionStartTime, null); + } + + TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, + Map progressMap, SessionState.LogHelper console, long executionStartTime, + YarnQueueMetricsCollector metricsCollector) + throws IOException, TezException { this.status = status; this.topSortedWork = topSortedWork; this.console = console; this.executionStartTime = executionStartTime; + this.metricsCollector = metricsCollector; for (Map.Entry entry : progressMap.entrySet()) { String vertexName = entry.getKey(); progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState())); @@ -327,6 +338,62 @@ public int hashCode() { } } + @Override + public String queueMetrics() { + if (metricsCollector == null) { + return ""; + } + + try { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = metricsCollector.getLatestSnapshot(); + if (snapshot == null) { + return "QUEUE: unavailable"; + } + + // Calculate staleness + long stalenessSeconds = (System.currentTimeMillis() - snapshot.getCollectionTimestamp()) / 1000; + String stalenessStr = stalenessSeconds > 60 ? ">60s ago" : stalenessSeconds + "s ago"; + + // Truncate queue name from start if too long + String queueName = metricsCollector.getQueueName(); + // Max length for line 1: 94 - "QUEUE: (XXXs ago)".length() + String stalenessAppend = " (" + stalenessStr + ")"; + int maxQueueNameLength = InPlaceUpdate.MIN_TERMINAL_WIDTH - "QUEUE: ".length() - stalenessAppend.length(); + String displayQueueName = queueName; + if (queueName.length() > maxQueueNameLength && maxQueueNameLength > 3) { + int keepLength = maxQueueNameLength - 3; // Leave room for "..." + displayQueueName = "..." + queueName.substring(queueName.length() - keepLength); + } + + // Line 1: Queue name + staleness + String line1 = "QUEUE: " + displayQueueName + stalenessAppend; + + // Line 2: Memory + VCores (resource usage) + String line2 = String.format( + "MEMORY: %.1f/%.1f GB (%s) | VCORES: %d/%d (%s)", + snapshot.getMemoryUsedGB(), + snapshot.getMemoryTotalGB(), + snapshot.getMemoryPercentage(), + snapshot.getVCoresUsed(), + snapshot.getVCoresTotal(), + snapshot.getVCoresPercentage() + ); + + // Line 3: Capacity + Active Apps + Pending (cluster-level info) + String line3 = String.format( + "CAPACITY: %.2f%% | ACTIVE_APPS: %d | PENDING: %d", + snapshot.getCapacityPercentage(), + snapshot.getRunningApps(), + snapshot.getPendingContainers() + ); + + return line1 + "\n" + line2 + "\n" + line3; + } catch (Exception e) { + console.printInfo("Error formatting queue metrics: " + e.getMessage()); + return "QUEUE: unavailable"; + } + } + public DAGStatus getStatus() { return status; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 53ee29743449..8b8836ec6d2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -2251,6 +2251,11 @@ public String executionStatus() { public double progressedPercentage() { return percentage; } + + @Override + public String queueMetrics() { + return ""; + } }; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java new file mode 100644 index 000000000000..fc1b45cd0068 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.when; + + +/** + * Test cases for YarnQueueMetricsCollector. + */ +public class TestYarnQueueMetricsCollector { + + @Mock + private YarnClient mockYarnClient; + + @Mock + private QueueInfo mockQueueInfo; + + @Mock + private QueueStatistics mockQueueStats; + + private AutoCloseable closeable; + + private static final long WAIT_TIMEOUT_MS = 5000; + + @Before + public void setUp() { + closeable = MockitoAnnotations.openMocks(this); + } + + @After + public void tearDown() throws Exception { + if (closeable != null) { + closeable.close(); + } + } + + /** + * Waits for a snapshot to be available (non-null). + */ + private YarnQueueMetricsCollector.QueueMetricsSnapshot waitForSnapshot( + YarnQueueMetricsCollector collector, long timeoutMs) { + long startTime = System.currentTimeMillis(); + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot; + while ((snapshot = collector.getLatestSnapshot()) == null) { + if (System.currentTimeMillis() - startTime > timeoutMs) { + fail("Snapshot not available after " + timeoutMs + "ms"); + } + Thread.yield(); + } + return snapshot; + } + + /** + * Waits for a specific number of invocations with timeout. + */ + private void waitForInvocationCount(Object mock, int minCount, long timeoutMs) { + long startTime = System.currentTimeMillis(); + while (mockingDetails(mock).getInvocations().size() < minCount) { + if (System.currentTimeMillis() - startTime > timeoutMs) { + return; // Don't fail, just return - test will check the count + } + Thread.yield(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorWithNullYarnClient() { + new YarnQueueMetricsCollector(null, "default", 1000, "query-1"); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorWithNullQueueName() { + new YarnQueueMetricsCollector(mockYarnClient, null, 1000, "query-1"); + } + + @Test + public void testSuccessfulMetricsCollection() throws Exception { + // Setup mock QueueStatistics + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(8192L); // 8GB in MB + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(8192L); // 8GB available in MB + when(mockQueueStats.getAllocatedVCores()).thenReturn(100L); + when(mockQueueStats.getAvailableVCores()).thenReturn(100L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(5L); + when(mockQueueStats.getPendingContainers()).thenReturn(10L); + + // Setup mock QueueInfo + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.25f); // 25% + + // Setup YarnClient to return mocked QueueInfo + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + // Create collector + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-1"); + + try { + // Wait for initial collection to complete + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + assertNotNull("Snapshot should not be null", snapshot); + assertEquals("Memory used should be 8GB", 8.0f, snapshot.getMemoryUsedGB(), 0.1f); + assertEquals("Memory total should be 16GB (8+8)", 16.0f, snapshot.getMemoryTotalGB(), 0.1f); + assertEquals("VCores used should be 100", 100, snapshot.getVCoresUsed()); + assertEquals("VCores total should be 200 (100+100)", 200, snapshot.getVCoresTotal()); + assertEquals("Running apps should be 5", 5, snapshot.getRunningApps()); + assertEquals("Pending containers should be 10", 10, snapshot.getPendingContainers()); + assertEquals("Capacity should be 25%", 25.0f, snapshot.getCapacityPercentage(), 0.1f); + + // Verify percentages + assertEquals("Memory percentage", "50.00%", snapshot.getMemoryPercentage()); + assertEquals("VCores percentage", "50.00%", snapshot.getVCoresPercentage()); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testMetricsCollectionWithNullQueueInfo() throws Exception { + // YarnClient returns null for queue info (queue doesn't exist) + when(mockYarnClient.getQueueInfo("nonexistent")).thenReturn(null); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "nonexistent", 10000, "test-query-2"); + + try { + // Constructor performs eager collection, snapshot should already be null + // (No need to wait since null QueueInfo means immediate null snapshot) + assertNull("Snapshot should be null for nonexistent queue", + collector.getLatestSnapshot()); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testMetricsCollectionWithNullQueueStatistics() throws Exception { + // QueueInfo exists but QueueStatistics is null + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); // 50% + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-3"); + + try { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + assertNotNull("Snapshot should not be null", snapshot); + // Should have zero values when QueueStatistics is null + assertEquals("Memory used should be 0", 0.0f, snapshot.getMemoryUsedGB(), 0.01f); + assertEquals("Memory total should be 0", 0.0f, snapshot.getMemoryTotalGB(), 0.01f); + assertEquals("VCores used should be 0", 0, snapshot.getVCoresUsed()); + assertEquals("VCores total should be 0", 0, snapshot.getVCoresTotal()); + assertEquals("Capacity should still be 50%", 50.0f, snapshot.getCapacityPercentage(), 0.1f); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testPercentageCalculationWithZeroTotal() { + // Setup with zero totals + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(0L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(0L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(0L); + when(mockQueueStats.getAvailableVCores()).thenReturn(0L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(0L); + when(mockQueueStats.getPendingContainers()).thenReturn(0L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.0f); + + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = + new YarnQueueMetricsCollector.QueueMetricsSnapshot(mockQueueInfo); + + // Should return "N/A" for percentages when total is zero + assertEquals("Memory percentage should be N/A", "N/A", snapshot.getMemoryPercentage()); + assertEquals("VCores percentage should be N/A", "N/A", snapshot.getVCoresPercentage()); + } + + @Test + public void testShutdownIdempotency() throws Exception { + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-4"); + + // Shutdown multiple times should not cause issues + collector.shutdown(); + collector.shutdown(); + collector.shutdown(); + + // Should not throw exception + assertTrue("Multiple shutdowns should be safe", true); + } + + @Test + public void testExceptionDuringCollection() throws Exception { + // YarnClient throws exception + when(mockYarnClient.getQueueInfo("default")) + .thenThrow(new RuntimeException("RM unavailable")); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-5"); + + try { + // Constructor performs eager collection which throws exception + // Snapshot should already be null + assertNull("Snapshot should be null after exception", + collector.getLatestSnapshot()); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testQueueNameRetrieval() throws Exception { + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "production", 10000, "test-query-6"); + + try { + assertEquals("Queue name should match", "production", collector.getQueueName()); + } finally { + collector.shutdown(); + } + } + + @Test + public void testMemoryAndVCoreCalculation() { + // Test with specific values to verify calculation + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(5120L); // 5GB used + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(15360L); // 15GB available + when(mockQueueStats.getAllocatedVCores()).thenReturn(50L); + when(mockQueueStats.getAvailableVCores()).thenReturn(150L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(3L); + when(mockQueueStats.getPendingContainers()).thenReturn(7L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.2f); // 20% + + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = + new YarnQueueMetricsCollector.QueueMetricsSnapshot(mockQueueInfo); + + // Total = Used + Available + assertEquals("Memory used", 5.0f, snapshot.getMemoryUsedGB(), 0.01f); + assertEquals("Memory total", 20.0f, snapshot.getMemoryTotalGB(), 0.01f); // 5+15 + assertEquals("Memory percentage", "25.00%", snapshot.getMemoryPercentage()); // 5/20 + + assertEquals("VCores used", 50, snapshot.getVCoresUsed()); + assertEquals("VCores total", 200, snapshot.getVCoresTotal()); // 50+150 + assertEquals("VCores percentage", "25.00%", snapshot.getVCoresPercentage()); // 50/200 + } + + @Test(expected = IllegalArgumentException.class) + public void testQueueMetricsSnapshotWithNullQueueInfo() { + new YarnQueueMetricsCollector.QueueMetricsSnapshot(null); + } + + // ------------------------------------------------------------------------- + // Tests for Issue #1: Jitter on initial delay (Thundering Herd prevention) + // ------------------------------------------------------------------------- + @Test + public void testInitialDelayHasJitter() throws Exception { + // Collect 10 collector start times and check the jitter spread + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + + long refreshIntervalMs = 10000; + + // We can't directly read the scheduled delay, but we can verify the collector + // starts successfully (constructor completes without exception) for multiple + // instances, confirming jitter calculation doesn't throw + YarnQueueMetricsCollector[] collectors = new YarnQueueMetricsCollector[5]; + try { + for (int i = 0; i < 5; i++) { + collectors[i] = new YarnQueueMetricsCollector( + mockYarnClient, "default", refreshIntervalMs, "jitter-test-query-" + i); + assertNotNull("Collector " + i + " should be created successfully", collectors[i]); + } + // If we get here, all 5 collectors were created with their own jittered delays + // without conflict or exception - thundering herd fix is in place + } finally { + for (YarnQueueMetricsCollector c : collectors) { + if (c != null) { + c.shutdown(); + } + } + } + } + + @Test + public void testExecutorCleanupOnInitializationFailure() throws Exception { + // Simulate YarnClient throwing on first call (during eager collection) + // The constructor should propagate the exception but NOT leak the executor + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("Simulated RM failure during init")); + + // Constructor wraps collection error - first call fails but shouldn't throw from constructor + // (collectMetrics swallows exceptions). This test verifies the try-catch guards are correct. + YarnQueueMetricsCollector collector = null; + try { + collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "init-fail-query"); + // Constructor should succeed (collectMetrics is resilient) - snapshot will be null + assertNull("Snapshot should be null after init failure", + collector.getLatestSnapshot()); + } finally { + if (collector != null) { + collector.shutdown(); + } + } + } + + // ------------------------------------------------------------------------- + // Tests for Issue #2: Circuit breaker for repeated failures + // ------------------------------------------------------------------------- + @Test + public void testCircuitBreakerActivatesAfterMaxFailures() throws Exception { + // YarnClient always throws - triggers circuit breaker after MAX_CONSECUTIVE_FAILURES + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("YARN RM unavailable")); + + // Use a very short interval so failures accumulate quickly + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 50, "circuit-breaker-query-1"); + + try { + // Wait for at least 6 invocations to allow circuit breaker to activate + waitForInvocationCount(mockYarnClient, 6, 1000); + + // Snapshot should be null throughout + assertNull("Snapshot should be null when circuit breaker active", + collector.getLatestSnapshot()); + + // After circuit breaker kicks in, the number of YarnClient calls should be + // significantly less than if there was no circuit breaker. + // Without circuit breaker: ~12 calls in 600ms at 50ms interval + // With circuit breaker at 10% after 5 failures: much fewer + int callCount = mockingDetails(mockYarnClient).getInvocations().size(); + assertTrue("Circuit breaker should reduce calls (got " + callCount + ")", + callCount < 12); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testCircuitBreakerResetsOnSuccess() throws Exception { + // First 5 calls fail, then succeed + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenReturn(mockQueueInfo); // 6th call succeeds + + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(4096L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(4096L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(50L); + when(mockQueueStats.getAvailableVCores()).thenReturn(50L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(2L); + when(mockQueueStats.getPendingContainers()).thenReturn(5L); + when(mockQueueInfo.getCapacity()).thenReturn(0.3f); + + // Use short interval so failures accumulate and recovery happens quickly + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 30, "circuit-breaker-recovery-query"); + + try { + // Wait for recovery - snapshot should eventually be populated + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, 2000); + + assertNotNull("Snapshot should be populated after circuit breaker recovery", snapshot); + assertEquals("Memory used should be 4GB", 4.0f, snapshot.getMemoryUsedGB(), 0.1f); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testCircuitBreakerDoesNotAffectSuccessfulCollection() throws Exception { + // Normal operation - no failures, circuit breaker should never activate + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(20L); + when(mockQueueStats.getAvailableVCores()).thenReturn(20L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(1L); + when(mockQueueStats.getPendingContainers()).thenReturn(0L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.1f); + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "no-failures-query"); + + try { + // Wait for snapshot to be available + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + // Snapshot should be available and correct + assertNotNull("Snapshot should be available with no failures", snapshot); + assertEquals("Memory used should be 2GB", 2.0f, snapshot.getMemoryUsedGB(), 0.1f); + assertEquals("VCores used should be 20", 20, snapshot.getVCoresUsed()); + + } finally { + collector.shutdown(); + } + } + + @Test + public void testNullQueueInfoIncreasesFailureCounter() throws Exception { + // null QueueInfo (queue doesn't exist) should also count as a failure + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(null); + + YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "nonexistent-queue", 50, "null-queueinfo-query"); + + try { + // Wait for at least 6 invocations to allow circuit breaker to activate + waitForInvocationCount(mockYarnClient, 6, 800); + + // Should still be null - null QueueInfo is treated as failure + assertNull("Snapshot should remain null for null QueueInfo", + collector.getLatestSnapshot()); + + // Circuit breaker should have activated - call count should be limited + int callCount = mockingDetails(mockYarnClient).getInvocations().size(); + assertTrue("Circuit breaker should reduce calls for null QueueInfo (got " + callCount + ")", + callCount < 10); + + } finally { + collector.shutdown(); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java new file mode 100644 index 000000000000..de5f10b116a9 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.client.DAGClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.atLeastOnce; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.anyString; + + + +/** + * Test cases for TezJobMonitor queue metrics initialization. + */ +public class TestTezJobMonitorQueueMetrics { + + @Mock + private TezSession mockSession; + + @Mock + private DAGClient mockDagClient; + + @Mock + private DAG mockDag; + + @Mock + private Context mockContext; + + @Mock + private PerfLogger mockPerfLogger; + + @Mock + private YarnClient mockYarnClient; + + @Mock + private TezCounters mockCounters; + + private HiveConf hiveConf; + private List topSortedWorks; + private SessionState sessionState; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + hiveConf = new HiveConfForTest(TestTezJobMonitorQueueMetrics.class); + hiveConf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + // Initialize SessionState - required by TezJobMonitor constructor + sessionState = SessionState.start(hiveConf); + topSortedWorks = new ArrayList<>(); + when(mockDag.getName()).thenReturn("test-dag-1"); + } + + @After + public void tearDown() throws Exception { + if (sessionState != null) { + sessionState.close(); + } + } + + @Test + public void testMetricsCollectorDisabledByDefault() throws Exception { + // By default, refresh interval is 0s so metrics should be disabled + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + // Metrics collector should not be initialized when interval is 0 (default) + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorEnabledWithInterval() { + // Enable metrics by setting a positive refresh interval + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 10, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + + // YarnClient should be retrieved when metrics are enabled + verify(mockSession, atLeastOnce()).getYarnClient(); + } + + @Test + public void testMetricsCollectorDisabledWithZeroInterval() throws Exception { + // Interval = 0 disables metrics (even without boolean flag) + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 0, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorDisabledWithNegativeInterval() throws Exception { + // Negative interval also disables metrics + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + -1, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorWithNullYarnClient() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 10, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(null); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = new TezJobMonitor( + mockSession, topSortedWorks, mockDagClient, hiveConf, + mockDag, mockContext, mockCounters, mockPerfLogger); + + verify(mockSession, atLeastOnce()).getYarnClient(); + assertNotNull("Monitor should be created", monitor); + } + + @Test + public void testMetricsCollectorWithNullQueueName() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 10, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn(null); + + TezJobMonitor monitor = new TezJobMonitor( + mockSession, topSortedWorks, mockDagClient, hiveConf, + mockDag, mockContext, mockCounters, mockPerfLogger); + + verify(mockSession, atLeastOnce()).getQueueName(); + assertNotNull("Monitor should be created", monitor); + } + + @Test + public void testMetricsCollectorWithEmptyQueueName() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 10, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn(" "); + + TezJobMonitor monitor = new TezJobMonitor( + mockSession, topSortedWorks, mockDagClient, hiveConf, + mockDag, mockContext, mockCounters, mockPerfLogger); + + verify(mockSession, atLeastOnce()).getQueueName(); + assertNotNull("Monitor should be created", monitor); + } + + @Test + public void testMetricsCollectorWithSmallInterval() { + // Interval below 1000ms is adjusted to 1000ms minimum + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 500, java.util.concurrent.TimeUnit.MILLISECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = new TezJobMonitor( + mockSession, topSortedWorks, mockDagClient, hiveConf, + mockDag, mockContext, mockCounters, mockPerfLogger); + + assertNotNull("Monitor should be created with adjusted interval", monitor); + } + + @Test + public void testMetricsCollectorWithCustomQueue() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, + 15, java.util.concurrent.TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("production.analytics"); + + TezJobMonitor monitor = new TezJobMonitor( + mockSession, topSortedWorks, mockDagClient, hiveConf, + mockDag, mockContext, mockCounters, mockPerfLogger); + + verify(mockSession, atLeastOnce()).getQueueName(); + assertNotNull("Monitor should be created with custom queue", monitor); + } +} + diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java new file mode 100644 index 000000000000..b9bf53364773 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + + +/** + * Test cases for TezProgressMonitor queue metrics functionality. + */ +public class TestTezProgressMonitorQueueMetrics { + + @Mock + private DAGClient mockDagClient; + + @Mock + private DAGStatus mockDagStatus; + + @Mock + private YarnQueueMetricsCollector mockMetricsCollector; + + @Mock + private YarnQueueMetricsCollector.QueueMetricsSnapshot mockSnapshot; + + @Mock + private SessionState.LogHelper mockConsole; + + @Mock + private QueueInfo mockQueueInfo; + + @Mock + private QueueStatistics mockQueueStats; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void testQueueMetricsWithNullCollector() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), null); + + String result = monitor.queueMetrics(); + + assertEquals("Should return empty string when collector is null", "", result); + } + + @Test + public void testQueueMetricsWithNullSnapshot() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(null); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + assertEquals("Should return 'unavailable' when snapshot is null", + "QUEUE: unavailable", result); + } + + @Test + public void testQueueMetricsFormatting() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + // Setup snapshot with known values + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(8.5f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(16.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("53.12%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(100); + when(mockSnapshot.getVCoresTotal()).thenReturn(200); + when(mockSnapshot.getVCoresPercentage()).thenReturn("50.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(25.0f); + when(mockSnapshot.getRunningApps()).thenReturn(5); + when(mockSnapshot.getPendingContainers()).thenReturn(10); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 5000); // 5 seconds ago + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + // Verify 3-line format + String[] lines = result.split("\n"); + assertEquals("Should have 3 lines", 3, lines.length); + + // Line 1: Queue name + staleness + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: default")); + assertTrue("Line 1 should contain staleness", lines[0].contains("s ago")); + + // Line 2: Memory + VCores + assertTrue("Line 2 should contain memory info", lines[1].contains("MEMORY: 8.5/16.0 GB")); + assertTrue("Line 2 should contain memory percentage", lines[1].contains("53.12%")); + assertTrue("Line 2 should contain vCores info", lines[1].contains("VCORES: 100/200")); + assertTrue("Line 2 should contain vCores percentage", lines[1].contains("50.00%")); + + // Line 3: Capacity + Apps + Pending + assertTrue("Line 3 should contain capacity", lines[2].contains("CAPACITY: 25.00%")); + assertTrue("Line 3 should contain running apps", lines[2].contains("ACTIVE_APPS: 5")); + assertTrue("Line 3 should contain pending containers", lines[2].contains("PENDING: 10")); + } + + @Test + public void testQueueMetricsStalenessBeyond60Seconds() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(10); + when(mockSnapshot.getVCoresTotal()).thenReturn(100); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(10.0f); + when(mockSnapshot.getRunningApps()).thenReturn(1); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 120000); // 120 seconds ago + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 3 lines", 3, lines.length); + // Staleness now on line 1 with queue name + assertTrue("Line 1 should show >60s ago for stale metrics", lines[0].contains(">60s ago")); + } + + @Test + public void testQueueNameTruncation() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(10); + when(mockSnapshot.getVCoresTotal()).thenReturn(100); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(10.0f); + when(mockSnapshot.getRunningApps()).thenReturn(1); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 1000); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + // Very long queue name + when(mockMetricsCollector.getQueueName()).thenReturn( + "root.production.analytics.data-engineering.team-alpha.project-beta"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 3 lines", 3, lines.length); + + // Line 1 should not exceed separator width (94 chars) + assertTrue("Line 1 should not exceed 94 characters", lines[0].length() <= 94); + + // When the full queue name would cause line 1 overflow, it should be truncated from start with "..." + if (lines[0].contains("...")) { + // Queue name was truncated + assertFalse("Full long queue name should not appear", + lines[0].contains("root.production.analytics.data-engineering.team-alpha.project-beta")); + assertTrue("Truncated queue name should contain ...", lines[0].contains("...")); + } + + // Line 1 should always contain staleness + assertTrue("Line 1 should contain staleness", lines[0].contains("s ago")); + + // Line 2 should contain resource info + assertTrue("Line 2 should contain MEMORY", lines[2 - 1].contains("MEMORY:")); + + // Line 3 should contain capacity + assertTrue("Line 3 should contain CAPACITY", lines[2].contains("CAPACITY:")); + } + + @Test + public void testQueueMetricsWithZeroPercentages() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(0.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(0.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("N/A"); + when(mockSnapshot.getVCoresUsed()).thenReturn(0); + when(mockSnapshot.getVCoresTotal()).thenReturn(0); + when(mockSnapshot.getVCoresPercentage()).thenReturn("N/A"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(0.0f); + when(mockSnapshot.getRunningApps()).thenReturn(0); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("empty"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 3 lines", 3, lines.length); + // Line 1: queue name + staleness + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: empty")); + // Line 2: memory + vcores with N/A + assertTrue("Line 2 should contain N/A for memory percentage", lines[1].contains("N/A")); + assertTrue("Line 2 should handle zero values", lines[1].contains("0.0/0.0 GB")); + // Line 3: capacity + apps + pending + assertTrue("Line 3 should contain capacity", lines[2].contains("CAPACITY:")); + } + + @Test + public void testQueueMetricsExceptionHandling() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenThrow( + new RuntimeException("Unexpected error")); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + // Should gracefully handle exceptions and return unavailable + assertEquals("Should return unavailable on exception", + "QUEUE: unavailable", result); + } +} + diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index a399e66445c8..c4a53f9b71a0 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -1286,6 +1286,7 @@ struct TProgressUpdateResp { 4: required TJobExecutionStatus status 5: required string footerSummary 6: required i64 startTime + 7: optional string queueMetrics } struct TGetQueryIdReq { diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java index 8b2f6a21629e..ef787a15d080 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java @@ -17,6 +17,7 @@ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField FOOTER_SUMMARY_FIELD_DESC = new org.apache.thrift.protocol.TField("footerSummary", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField START_TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("startTime", org.apache.thrift.protocol.TType.I64, (short)6); + private static final org.apache.thrift.protocol.TField QUEUE_METRICS_FIELD_DESC = new org.apache.thrift.protocol.TField("queueMetrics", org.apache.thrift.protocol.TType.STRING, (short)7); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TProgressUpdateRespStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TProgressUpdateRespTupleSchemeFactory(); @@ -27,6 +28,7 @@ private @org.apache.thrift.annotation.Nullable TJobExecutionStatus status; // required private @org.apache.thrift.annotation.Nullable java.lang.String footerSummary; // required private long startTime; // required + private @org.apache.thrift.annotation.Nullable java.lang.String queueMetrics; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -39,7 +41,8 @@ public enum _Fields implements org.apache.thrift.TFieldIdEnum { */ STATUS((short)4, "status"), FOOTER_SUMMARY((short)5, "footerSummary"), - START_TIME((short)6, "startTime"); + START_TIME((short)6, "startTime"), + QUEUE_METRICS((short)7, "queueMetrics"); private static final java.util.Map byName = new java.util.HashMap(); @@ -67,6 +70,8 @@ public static _Fields findByThriftId(int fieldId) { return FOOTER_SUMMARY; case 6: // START_TIME return START_TIME; + case 7: // QUEUE_METRICS + return QUEUE_METRICS; default: return null; } @@ -129,6 +134,8 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.START_TIME, new org.apache.thrift.meta_data.FieldMetaData("startTime", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.QUEUE_METRICS, new org.apache.thrift.meta_data.FieldMetaData("queueMetrics", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TProgressUpdateResp.class, metaDataMap); } @@ -152,7 +159,6 @@ public TProgressUpdateResp( this.status = status; this.footerSummary = footerSummary; this.startTime = startTime; - setStartTimeIsSet(true); } /** @@ -180,6 +186,9 @@ public TProgressUpdateResp(TProgressUpdateResp other) { this.footerSummary = other.footerSummary; } this.startTime = other.startTime; + if (other.isSetQueueMetrics()) { + this.queueMetrics = other.queueMetrics; + } } public TProgressUpdateResp deepCopy() { @@ -196,6 +205,7 @@ public void clear() { this.footerSummary = null; setStartTimeIsSet(false); this.startTime = 0; + this.queueMetrics = null; } public int getHeaderNamesSize() { @@ -378,6 +388,30 @@ public void setStartTimeIsSet(boolean value) { __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __STARTTIME_ISSET_ID, value); } + @org.apache.thrift.annotation.Nullable + public java.lang.String getQueueMetrics() { + return this.queueMetrics; + } + + public void setQueueMetrics(@org.apache.thrift.annotation.Nullable java.lang.String queueMetrics) { + this.queueMetrics = queueMetrics; + } + + public void unsetQueueMetrics() { + this.queueMetrics = null; + } + + /** Returns true if field queueMetrics is set (has been assigned a value) and false otherwise */ + public boolean isSetQueueMetrics() { + return this.queueMetrics != null; + } + + public void setQueueMetricsIsSet(boolean value) { + if (!value) { + this.queueMetrics = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case HEADER_NAMES: @@ -428,6 +462,14 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable } break; + case QUEUE_METRICS: + if (value == null) { + unsetQueueMetrics(); + } else { + setQueueMetrics((java.lang.String)value); + } + break; + } } @@ -452,6 +494,9 @@ public java.lang.Object getFieldValue(_Fields field) { case START_TIME: return getStartTime(); + case QUEUE_METRICS: + return getQueueMetrics(); + } throw new java.lang.IllegalStateException(); } @@ -475,6 +520,8 @@ public boolean isSet(_Fields field) { return isSetFooterSummary(); case START_TIME: return isSetStartTime(); + case QUEUE_METRICS: + return isSetQueueMetrics(); } throw new java.lang.IllegalStateException(); } @@ -546,6 +593,15 @@ public boolean equals(TProgressUpdateResp that) { return false; } + boolean this_present_queueMetrics = true && this.isSetQueueMetrics(); + boolean that_present_queueMetrics = true && that.isSetQueueMetrics(); + if (this_present_queueMetrics || that_present_queueMetrics) { + if (!(this_present_queueMetrics && that_present_queueMetrics)) + return false; + if (!this.queueMetrics.equals(that.queueMetrics)) + return false; + } + return true; } @@ -573,6 +629,10 @@ public int hashCode() { hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(startTime); + hashCode = hashCode * 8191 + ((isSetQueueMetrics()) ? 131071 : 524287); + if (isSetQueueMetrics()) + hashCode = hashCode * 8191 + queueMetrics.hashCode(); + return hashCode; } @@ -851,6 +911,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TProgressUpdateResp org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // QUEUE_METRICS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.queueMetrics = iprot.readString(); + struct.setQueueMetricsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -911,6 +979,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TProgressUpdateRes oprot.writeFieldBegin(START_TIME_FIELD_DESC); oprot.writeI64(struct.startTime); oprot.writeFieldEnd(); + if (struct.queueMetrics != null) { + oprot.writeFieldBegin(QUEUE_METRICS_FIELD_DESC); + oprot.writeString(struct.queueMetrics); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } diff --git a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java index 332ef2dace64..d3693128bbc4 100644 --- a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java +++ b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java @@ -28,21 +28,23 @@ public class JobProgressUpdate { private final List headers; private final List> rows; public final String status; + public final String queueMetrics; JobProgressUpdate(ProgressMonitor monitor) { this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(), - monitor.startTime(), monitor.executionStatus()); + monitor.startTime(), monitor.executionStatus(), monitor.queueMetrics()); } private JobProgressUpdate(List headers, List> rows, String footerSummary, - double progressedPercentage, long startTimeMillis, String status) { + double progressedPercentage, long startTimeMillis, String status, String queueMetrics) { this.progressedPercentage = progressedPercentage; this.footerSummary = footerSummary; this.startTimeMillis = startTimeMillis; this.headers = headers; this.rows = rows; this.status = status; + this.queueMetrics = queueMetrics; } public List headers() { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index ccf576fe50d3..ab18592a9631 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -813,14 +813,18 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } TJobExecutionStatus executionStatus = mapper.forStatus(progressUpdate.status); - resp.setProgressUpdateResponse(new TProgressUpdateResp( + TProgressUpdateResp tProgressUpdateResp = new TProgressUpdateResp( progressUpdate.headers(), progressUpdate.rows(), progressUpdate.progressedPercentage, executionStatus, progressUpdate.footerSummary, progressUpdate.startTimeMillis - )); + ); + if (progressUpdate.queueMetrics != null && !progressUpdate.queueMetrics.isEmpty()) { + tProgressUpdateResp.setQueueMetrics(progressUpdate.queueMetrics); + } + resp.setProgressUpdateResponse(tProgressUpdateResp); if (opException != null) { resp.setSqlState(opException.getSQLState()); resp.setErrorCode(opException.getErrorCode());