278 字
1 分钟
flink 1.17 sink (输入) and source (输出) Scala

Sources 输出#

example:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("/source/1.txt")
value.print()
value.sinkTo(sink)
env.execute()
}
}

example:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
value.print()
value.sinkTo(sink)
env.execute()
}
}
package org.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = KafkaSource
.builder()
.setBootstrapServers("172.18.38.32:9092,172.18.38.33:9092,172.18.38.34:9092") // setting kafka bootstrap servers
.setTopics("test") // setting kafka topic
.setGroupId("aasds") // setting kafka group_id
.setStartingOffsets(OffsetsInitializer.earliest()) // setting kafka offset
.setValueOnlyDeserializer(new SimpleStringSchema()) // value deserializer
.build()
val value1 = env.fromSource(
value,
WatermarkStrategy.noWatermarks(),
"Demo Source"
)
value1.print()
env.execute()
}
}

Sinks 输出#

使用本地系统输出

pom.xml

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>

example:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
val sink: FileSink[String] = FileSink
.forRowFormat(new Path("./output"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build()
value.print()
value.sinkTo(sink)
env.execute()
}
}

pom.xml

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>

使用hadoop系统输出

example:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("hdfs://xxx.xxx.xxx.xxx:9000/data/word.txt")
val sink: FileSink[String] = FileSink
.forRowFormat(new Path("hdfs://xxx.xxx.xxx.xxx:9000/output/flink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build()
value.print()
value.sinkTo(sink)
env.execute()
}
}
flink 1.17 sink (输入) and source (输出) Scala
https://fuwari.vercel.app/posts/bigdata/scala/flink/flinksinkandsource/
作者
有空无白
发布于
2025-08-12
许可协议
CC BY-NC-SA 4.0