在数据量小的情况下,flink基于事件驱动,处理更新、删除、追加历史数据
发布日期:2025-06-19 16:15:19
浏览次数:5
分类:精选文章
本文共 7329 字,大约阅读时间需要 24 分钟。
实现思想,每来一条数据,按照数据 更新、删除、追加三种情况直接用JDBC的方式更新数据库,然后在flatmap函数中,用最新的所有数据通过JDBC的处理,得到最新的结果,样例如下
package com.baiimport java.sqlimport java.sql.{DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}import com.alibaba.fastjson.{JSON, JSONObject}import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorobject JDBCtest { var conn: sql.Connection = _ var selectStmt: PreparedStatement = _ var insertStmt: PreparedStatement = _ def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() val ncStream = env.socketTextStream("hadoop104", 7777) ncStream.print("before") val value1: DataStream[String] = ncStream.filter(_.matches("(\\d)+,(.)*")) value1.print("after") val value2: DataStream[Student] = value1.flatMap(new RichFlatMapFunction[String, Student] { override def flatMap(value: String, out: Collector[Student]) = { val arr: Array[String] = value.split(",", 2) conn = DriverManager.getConnection("jdbc:mysql://hadoop103:3306/test", "root", "123456") println(arr.toList) insertStmt = conn.prepareStatement("insert into input (id, name) values (?,?)") insertStmt.setInt(1, arr(0).toInt) insertStmt.setString(2, arr(1)) insertStmt.execute() selectStmt = conn.prepareStatement("select * from input") val data: ResultSetMetaData = selectStmt.getMetaData val resultSet: ResultSet = selectStmt.executeQuery() while (resultSet.next()) { val map = new java.util.HashMap[String, String]() val jSONObject = new JSONObject() for (i <- 1 to data.getColumnCount) { val key: String = data.getColumnName(i) val value: String = resultSet.getString(i) map.put(key, value) jSONObject.put(key, value) } val a: Student = JSON.parseObject(jSONObject.toString(), classOf[Student]) out.collect(a) } selectStmt.close() conn.close() } }) value2.print("dd") env.execute() }}case class Student(id: Int, name: String)maven 依赖如下
UTF-8 1.11.0 2.11 2.11.12 2.12.1 2.7.3 mysql mysql-connector-java 5.1.44 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-runtime-web_2.11 ${flink.version} org.scala-lang scala-library ${scala.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.hbase hbase-server 2.2.4 provided com.google.guava guava 29.0-jre com.alibaba fastjson 1.2.68 org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.hadoop hadoop-common ${hadoop.version} provided org.slf4j slf4j-log4j12 log4j log4j org.slf4j slf4j-api commons-logging commons-logging org.apache.hadoop hadoop-hdfs ${hadoop.version} provided log4j log4j org.apache.hadoop hadoop-client ${hadoop.version} provided jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar org.apache.maven.plugins maven-compiler-plugin 3.6.1 1.8 1.8 org.scala-tools maven-scala-plugin 2.15.1 compile-scala add-source compile test-compile-scala add-source testCompile org.apache.maven.plugins maven-assembly-plugin 3.0.0 make-assembly package single net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile
发表评论
最新留言
做的很好,不错不错
[***.243.131.199]2026年06月13日 08时37分26秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
PHP调用接口用post方法传送json数据
2023-03-02
php转化IP为整形
2023-03-02
php输出数据到csv文件
2023-03-02
php输出语句
2023-03-02
php运行原理详细说明
2023-03-02
php运行环境出现Undefined index 或variable时解决方法
2023-03-02
php进程通信
2023-03-02
R&Python Data Science 系列:数据处理(2)
2023-03-02
php递归算法总结
2023-03-02
PHP递归遍历文件夹
2023-03-02
R&Python Data Science 系列:数据处理(1)
2023-03-02
php错误日志文件
2023-03-02
php隐藏手机号中间4位方法总结
2023-03-02
php面向对象三大特征封装、多态、继承
2023-03-02
php面向对象全攻略
2023-03-02
php面向对象的基础题
2023-03-02
php面试题二--解决网站大流量高并发方案(从url到硬盘来解决高并发方案总结)...
2023-03-02
php页面增加自选项,php-在Woocommerce中添加新的自定义默认订购目录选项
2023-03-02
php页面静态化技术;学习笔记
2023-03-02