278 字
1 分钟
flink 1.17 sink (输入) and source (输出) Scala
Sources 输出
Flink Local File System
example:
import org.apache.flink.api.common.serialization.SimpleStringEncoderimport org.apache.flink.configuration.MemorySizeimport org.apache.flink.connector.file.sink.FileSinkimport org.apache.flink.core.fs.Pathimport org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicyimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("/source/1.txt")
value.print()
value.sinkTo(sink)
env.execute() }}Flink HDFS File System
example:
import org.apache.flink.api.common.serialization.SimpleStringEncoderimport org.apache.flink.configuration.MemorySizeimport org.apache.flink.connector.file.sink.FileSinkimport org.apache.flink.core.fs.Pathimport org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicyimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
value.print()
value.sinkTo(sink)
env.execute() }}Flink KAFKA Source
package org.example
import org.apache.flink.api.common.eventtime.WatermarkStrategyimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.connector.kafka.source.KafkaSourceimport org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object Main { def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = KafkaSource .builder() .setBootstrapServers("172.18.38.32:9092,172.18.38.33:9092,172.18.38.34:9092") // setting kafka bootstrap servers .setTopics("test") // setting kafka topic .setGroupId("aasds") // setting kafka group_id .setStartingOffsets(OffsetsInitializer.earliest()) // setting kafka offset .setValueOnlyDeserializer(new SimpleStringSchema()) // value deserializer .build()
val value1 = env.fromSource( value, WatermarkStrategy.noWatermarks(), "Demo Source" )
value1.print()
env.execute() }}Sinks 输出
Flink Local File System
使用本地系统输出
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>example:
import org.apache.flink.api.common.serialization.SimpleStringEncoderimport org.apache.flink.configuration.MemorySizeimport org.apache.flink.connector.file.sink.FileSinkimport org.apache.flink.core.fs.Pathimport org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicyimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
val sink: FileSink[String] = FileSink .forRowFormat(new Path("./output"), new SimpleStringEncoder[String]("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withMaxPartSize(MemorySize.ofMebiBytes(1024)) .build()) .build()
value.print()
value.sinkTo(sink)
env.execute() }}Flink HDFS File System
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>使用hadoop系统输出
example:
import org.apache.flink.api.common.serialization.SimpleStringEncoderimport org.apache.flink.configuration.MemorySizeimport org.apache.flink.connector.file.sink.FileSinkimport org.apache.flink.core.fs.Pathimport org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicyimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
val sink: FileSink[String] = FileSink .forRowFormat(new Path("hdfs://xxx.xxx.xxx.xxx:9000/output/flink"), new SimpleStringEncoder[String]("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withMaxPartSize(MemorySize.ofMebiBytes(1024)) .build()) .build()
value.print()
value.sinkTo(sink)
env.execute() }} flink 1.17 sink (输入) and source (输出) Scala
https://fuwari.vercel.app/posts/bigdata/scala/flink/flinksinkandsource/