Skip to content

Commit

Permalink
HIVE-28623: Implement RuntimeContext to decouple runtime data from Te…
Browse files Browse the repository at this point in the history
…zTask
  • Loading branch information
abstractdog committed Nov 12, 2024
1 parent e48cb8e commit b4197e7
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 24 deletions.
9 changes: 9 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.TezRuntimeContext;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;

Expand Down Expand Up @@ -78,6 +81,8 @@ public class DriverContext {
private String operationId;
private String queryErrorMessage;

private TezRuntimeContext runtimeContext;

public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner hookRunner,
HiveTxnManager initTxnManager) {
this.queryState = queryState;
Expand Down Expand Up @@ -120,6 +125,10 @@ public HiveTxnManager getInitTxnManager() {
}

public QueryPlan getPlan() {
if (plan != null) {
TezTask task = Utilities.getFirstTezTask(plan.getRootTasks()).orElse(null);
this.runtimeContext = task == null ? null : task.getRuntimeContext();
}
return plan;
}

Expand Down
14 changes: 5 additions & 9 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,7 @@
import java.beans.Encoder;
import java.beans.Expression;
import java.beans.Statement;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -2822,6 +2814,10 @@ public static List<TezTask> getTezTasks(List<Task<?>> tasks) {
return getTasks(tasks, new TaskFilterFunction<>(TezTask.class));
}

public static Optional<TezTask> getFirstTezTask(List<Task<? extends Serializable>> tasks) {
return getTezTasks(tasks).stream().findFirst();
}

public static List<ExecDriver> getMRTasks(List<Task<?>> tasks) {
return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class));
}
Expand Down
108 changes: 108 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.tez;

import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounters;

/**
* TezRuntimeContext is a class used by mainly TezTask to store runtime information.
*/
public class TezRuntimeContext {
//package protected: it's fine to be visible within the tez related package
TezCounters counters;

// dag id of the running dag
private String dagId;
// tez application id
private String appId;
// tez session id
private String sessionId;
// address (host:port) of the AM
private String amAddress;
private TezJobMonitor monitor;
// llap/container
private String executionMode;

public TezCounters getCounters() {
return counters;
}

public void setCounters(TezCounters counters) {
this.counters = counters;
}

public String getDagId() {
return dagId;
}

public void setDagId(String dagId) {
this.dagId = dagId;
}

public String getSessionId() {
return sessionId;
}

public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}

public String getApplicationId() {
return appId;
}

public void setApplicationId(String appId) {
this.appId = appId;
}

public String getExecutionMode() {
return executionMode;
}

public void setExecutionMode(String executionMode) {
this.executionMode = executionMode;
}

public String getAmAddress() {
return amAddress;
}

public void initFromTezClient(TezClient tezClient) {
this.amAddress = tezClient.getAmHost() + ":" + tezClient.getAmPort();
}

public TezJobMonitor getMonitor() {
return monitor;
}

public void setMonitor(TezJobMonitor monitor) {
this.monitor = monitor;
}

public long getCounter(String groupName, String counterName) {
CounterGroup group = getCounters().getGroup(groupName);
if (group == null) {
return 0;
}
return group.findCounter(counterName, true).getValue();
}
}
50 changes: 35 additions & 15 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class TezTask extends Task<TezWork> {
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction";

private TezCounters counters;
private final TezRuntimeContext runtimeContext = new TezRuntimeContext();

private final DagUtils utils;

Expand All @@ -134,11 +134,15 @@ public TezTask(DagUtils utils) {
}

public TezCounters getTezCounters() {
return counters;
return runtimeContext.getCounters();
}

public void setTezCounters(final TezCounters counters) {
this.counters = counters;
runtimeContext.setCounters(counters);
}

public TezRuntimeContext getRuntimeContext() {
return runtimeContext;
}

/**
Expand Down Expand Up @@ -196,6 +200,8 @@ public int execute() {
ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));

WmContext wmContext = ctx.getWmContext();
String executionMode = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE);
runtimeContext.setExecutionMode(executionMode);
// jobConf will hold all the configuration for hadoop, tez, and hive, which are not set in AM defaults
JobConf jobConf = utils.createConfiguration(conf, false);

Expand Down Expand Up @@ -261,13 +267,19 @@ public int execute() {

// Log all the info required to find the various logs for this query
String dagId = this.dagClient.getDagIdentifierString();
LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG Session ID: [{}]", ServerUtils.hostname(), queryId,
dagId, this.dagClient.getSessionIdentifierString());
String appId = this.dagClient.getSessionIdentifierString();
LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG App ID: [{}]", ServerUtils.hostname(), queryId,
dagId, appId);
LogUtils.putToMDC(LogUtils.DAGID_KEY, dagId);
this.jobID = dagId;
runtimeContext.setDagId(dagId);
runtimeContext.setSessionId(session.getSessionId());
runtimeContext.setApplicationId(appId);

// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx, counters, perfLogger);
TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx, runtimeContext.counters,
perfLogger);
runtimeContext.setMonitor(monitor);
rc = monitor.monitorExecution();

if (rc != 0) {
Expand All @@ -283,12 +295,13 @@ public int execute() {
TezCounters dagCounters = dagStatus.getDAGCounters();

// if initial counters exists, merge it with dag counters to get aggregated view
TezCounters mergedCounters = counters == null ? dagCounters : Utils.mergeTezCounters(dagCounters, counters);
counters = mergedCounters;
TezCounters mergedCounters = runtimeContext.counters == null ? dagCounters : Utils.mergeTezCounters(
dagCounters, runtimeContext.counters);
runtimeContext.counters = mergedCounters;
} catch (Exception err) {
// Don't fail execution due to counters - just don't print summary info
LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete.", err);
counters = null;
runtimeContext.counters = null;
}

// save useful commit information into query state, e.g. for custom commit hooks, like Iceberg
Expand Down Expand Up @@ -320,10 +333,10 @@ public int execute() {
}
}

if (LOG.isInfoEnabled() && counters != null
if (LOG.isInfoEnabled() && runtimeContext.counters != null
&& (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf))) {
for (CounterGroup group: counters) {
for (CounterGroup group : runtimeContext.counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {
LOG.info(" "+counter.getDisplayName()+": "+counter.getValue());
Expand Down Expand Up @@ -407,8 +420,8 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
}

private void updateNumRows() {
if (counters != null) {
TezCounter counter = counters.findCounter(
if (runtimeContext.counters != null) {
TezCounter counter = runtimeContext.counters.findCounter(
conf.getVar(HiveConf.ConfVars.HIVE_COUNTER_GROUP), FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN);
if (counter != null) {
queryState.setNumModifiedRows(counter.getValue());
Expand Down Expand Up @@ -647,7 +660,7 @@ DAGClient submit(DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception
console.printInfo("Tez session was closed. Reopening...");
sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
console.printInfo("Session re-established.");
dagClient = sessionState.getSession().submitDAG(dag);
dagClient = submitInternal(dag, sessionState);
}
} catch (Exception e) {
if (this.isShutdown) {
Expand All @@ -660,7 +673,7 @@ DAGClient submit(DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
dagClient = sessionState.getSession().submitDAG(dag);
dagClient = submitInternal(dag, sessionState);
} catch (Exception retryException) {
// we failed to submit after retrying.
// If this is a non-pool session, destroy it.
Expand All @@ -674,6 +687,13 @@ DAGClient submit(DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception
return new SyncDagClient(dagClient);
}

private DAGClient submitInternal(DAG dag, TezSessionState sessionState) throws TezException, IOException {
TezClient tezClient = sessionState.getSession();
DAGClient dagClient = tezClient.submitDAG(dag);
runtimeContext.initFromTezClient(tezClient);
return dagClient;
}

private void sessionDestroyOrReturnToPool(Ref<TezSessionState> sessionStateRef,
TezSessionState sessionState) throws Exception{
sessionStateRef.value = null;
Expand Down

0 comments on commit b4197e7

Please sign in to comment.