检查依赖数据到达后定时触发任务
发布日期:2021-04-30 21:06:27
浏览次数:107
分类:精选文章
本文共 3347 字,大约阅读时间需要 11 分钟。
YARN???????? # ???????? # 1. ??YARN???????? declare -a task_queues=() # 2. ??YARN?????? appStates=(NEW NEW_SAVING SUBMITTED ACCEPTED RUNNING) # 3. ??YARN??????????????????? function IsExistsYarnTask() { # ??????????????? for appState in ${appStates[@]}; do runningAppIds=$(yarn application -list -appStates $appState | grep application_ | grep -v yarn-ats | awk '{print $1}') if [[ -z ${runningAppIds[@]} ]]; then return 1 fi done return 0 } # 4. ????????????????? function push2TaskQueues(task_name) { # ??????????????? if [[ "${task_queues[@]}" =~ "${task_name}" ]]; then # ????????1ms usleep 1 else # ???????????? task_queues[${#task_queues[@]}]=${task_name} fi } # 5. ????????????????? function IsInTaskQueues(task_name) { if [[ "${task_queues[@]}" =~ "${task_name}" ]]; then return 1 else return 0 fi } # 6. ????????YARN function submitJob2Yarn(task_name) { # ??YARN?????????????????? isNotExists=IsExistsYarnTask # ??????????????? isNotIn=IsInTaskQueues task_name if [[ $isNotExists -eq 0 && $isNotIn -eq 0 ]]; then echo "???????${task_name}" # ?????????ID appId=0 while [[ $appId -eq 0 ]]; do appId=$(yarn application -list -appStates FINISHED | grep application_ | grep -v yarn-ats | awk '{print $1}') done # ???????????? push2TaskQueues task_name fi } # 7. ??23??????? function cleanTaskQueues() { cleanHour=$(date +{%H}) if [[ ${cleanHour} -eq 23 ]]; then task_queues=() fi } # 8. ???????????? task_name=${1} str_tables=${2} declare -a base_tables=(${str_tables//,/}) # 9. ???????? dt=$(date +"%Y%m%d" -d "-2day") # 10. ??????? mysql_conn="mysql -h${HOST_NAME} -P${PORT} -u${USER_NAME} -p${PASSWORD} -D${DBNAME} -s -e" # 11. ?????????????????? if [[ ${#base_tables[@]} -gt 0 ]]; then for ((i=0; i<${#base_tables[@]}; i++)); do query_sql="select count(id) from hdfs_file_desc where table_name='${base_tables[i]}' and arrive_date=${dt}" echo "??${base_tables[i]}??????${query_sql}" rs_count=$(mysql_conn "${query_sql}") if [[ ${rs_count} -gt 0 ]]; then let index+=1 continue else break fi done fi # 12. ????????????????? if [[ ${#base_tables[@]} -eq ${index} ]]; then echo "${task_name}: ?????${str_tables}???????????????..." else echo "${task_name}: ?????${str_tables}?????????????..." fi
发表评论
最新留言
感谢大佬
[***.8.128.20]2026年06月01日 19时26分10秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
php实现根据身份证获取年龄
2023-03-01
PHP实现的MongoDB数据增删改查
2023-03-01
PHP实现的SSO单点登录系统,拿走就用吧
2023-03-01
php实现短信验证功能
2023-03-01
php实现逆转数组
2023-03-01
PHP实现通过geoip获取IP地理信息
2023-03-01
PHP实现页面静态化、纯静态化及伪静态化
2023-03-01
php容许ajax跨域,PHP设置允许ajax跨域请求的两种常见方法
2023-03-01
RabbitMQ进程结构分析与性能调优
2023-03-01
PHP对接百度地图
2023-03-01
PHP对表单提交特殊字符的过滤和处理
2023-03-01
php对象引用和析构函数的关系
2023-03-01
RabbitMQ HTTP 认证后端项目常见问题解决方案
2023-03-01
PHP将图片转换成base64格式(优缺点)
2023-03-01
php将多个值的数组去除重复元素
2023-03-01
php局域网上传文件_PHP如何通过CURL上传文件
2023-03-01
PHP工具插件大全
2023-03-01
php布尔值的++
2023-03-01
PHP常量、变量作用域详解(一)
2023-03-01