flink-sql-connector-kafka-1.15.0.jar
设置java版本:在每个终端内设置
$ export JAVA_HOME=/Users/wangzhikai/jdk-11.0.15.jdk/Contents/Home
$ java -version
output->
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
安装kafka:
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
启动zookeeper(kafka内置):
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
创建topic(例子里是quickstart-events)并启动生产者:
$ /Users/wangzhikai/kafka_2.12-3.2.0/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
$ /Users/wangzhikai/kafka_2.12-3.2.0/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
$ /Users/wangzhikai/kafka_2.12-3.2.0/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
optional:(可启动消费者查看是否kafka运行正常)
$ /Users/wangzhikai/kafka_2.12-3.2.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
安装flink:同kafka,解压即可
启动flink:
$ ~/flink-1.15.0/bin/start-cluster.sh
结束flink:
$ ~/flink-1.15.0/bin/stop-cluster.sh
任务:
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///Users/wangzhikai/flink-sql-connector-kafka-1.15.0.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW_NAMED(
["a","b"], [Types.STRING(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='quickstart-events',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds = env.add_source(kafka_consumer)
对ds进行变换:
ds = ds.map(lambda a: a + "d")
(参考可用的操作operator)
kafka写入消息:(“a”,”b”等列名按照JsonRowDeserializationSchema里定义的来,否则会输出Row(None, None))
# kafka-console-producer中 > 后面写入消息,格式如下:
> {"a":1,"b":"dfajdslkfj"}
> {"a":5,"b":"gajgsjd"}
> {"a":2,"b":"dsfjalj"}
> ...
在jupyter notebook中查看打印结果:
with ds.execute_and_collect() as results:
for result in results:
print(result)
<Row('1', 'dfajdslkfjd')>
<Row('5', 'gajgsjdd')>
<Row('2', 'dsfjaljd')>
...