检查依赖数据到达后定时触发任务
发布日期:2021-04-30 21:06:27 浏览次数:107 分类:精选文章

本文共 3347 字,大约阅读时间需要 11 分钟。

YARN????????
        # ????????
# 1. ??YARN????????
declare -a task_queues=()
# 2. ??YARN??????
appStates=(NEW NEW_SAVING SUBMITTED ACCEPTED RUNNING)
# 3. ??YARN???????????????????
function IsExistsYarnTask() {
# ???????????????
for appState in ${appStates[@]}; do
runningAppIds=$(yarn application -list -appStates $appState | grep application_ | grep -v yarn-ats | awk '{print $1}')
if [[ -z ${runningAppIds[@]} ]]; then
return 1
fi
done
return 0
}
# 4. ?????????????????
function push2TaskQueues(task_name) {
# ???????????????
if [[ "${task_queues[@]}" =~ "${task_name}" ]]; then
# ????????1ms
usleep 1
else
# ????????????
task_queues[${#task_queues[@]}]=${task_name}
fi
}
# 5. ?????????????????
function IsInTaskQueues(task_name) {
if [[ "${task_queues[@]}" =~ "${task_name}" ]]; then
return 1
else
return 0
fi
}
# 6. ????????YARN
function submitJob2Yarn(task_name) {
# ??YARN??????????????????
isNotExists=IsExistsYarnTask
# ???????????????
isNotIn=IsInTaskQueues task_name
if [[ $isNotExists -eq 0 && $isNotIn -eq 0 ]]; then
echo "???????${task_name}"
# ?????????ID
appId=0
while [[ $appId -eq 0 ]]; do
appId=$(yarn application -list -appStates FINISHED | grep application_ | grep -v yarn-ats | awk '{print $1}')
done
# ????????????
push2TaskQueues task_name
fi
}
# 7. ??23???????
function cleanTaskQueues() {
cleanHour=$(date +{%H})
if [[ ${cleanHour} -eq 23 ]]; then
task_queues=()
fi
}
# 8. ????????????
task_name=${1}
str_tables=${2}
declare -a base_tables=(${str_tables//,/})
# 9. ????????
dt=$(date +"%Y%m%d" -d "-2day")
# 10. ???????
mysql_conn="mysql -h${HOST_NAME} -P${PORT} -u${USER_NAME} -p${PASSWORD} -D${DBNAME} -s -e"
# 11. ??????????????????
if [[ ${#base_tables[@]} -gt 0 ]]; then
for ((i=0; i<${#base_tables[@]}; i++)); do
query_sql="select count(id) from hdfs_file_desc where table_name='${base_tables[i]}' and arrive_date=${dt}"
echo "??${base_tables[i]}??????${query_sql}"
rs_count=$(mysql_conn "${query_sql}")
if [[ ${rs_count} -gt 0 ]]; then
let index+=1
continue
else
break
fi
done
fi
# 12. ?????????????????
if [[ ${#base_tables[@]} -eq ${index} ]]; then
echo "${task_name}: ?????${str_tables}???????????????..."
else
echo "${task_name}: ?????${str_tables}?????????????..."
fi
上一篇:所有文件的路径都在一个配置文件中的处理
下一篇:动态规划--Leetcode64.最小路径和

发表评论

最新留言

感谢大佬
[***.8.128.20]2026年06月01日 19时26分10秒