Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

article/2025/6/26 12:13:58

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述


http://www.hkcw.cn/article/UnvkCvFbAT.shtml

相关文章

Linux指令:

我们今天来学习一下linux的一些相关的指令L&#xff1a; 1. 快速认识6~8个指令&#xff1a; 第一条&#xff1a;pwd pwd指令表示的是我当前在哪条路径下&#xff1b;我当前在哪里&#xff1b; 我们看这个第二句话&#xff0c;因为在windows环境下&#xff0c;当我们登录进入到…

网络攻防技术五:网络扫描技术

文章目录 一、网络扫描的基础概念二、主机发现三、端口扫描1、端口号2、端口扫描技术3、端口扫描隐秘策略 四、操作系统识别五、漏洞扫描六、简答题1. 主机扫描的目的是什么&#xff1f;请简述主机扫描方法。2. 端口扫描的目的是什么&#xff1f;请简述端口扫描方法及扫描策略。…

win32相关(虚拟内存和物理内存)

虚拟内存和物理内存 在win32操作系统下&#xff0c;每个进程都有它自己独立的4GB空间&#xff0c;是window给它分配的一个虚拟空间&#xff0c;并不是真正的物理空间&#xff0c;这4GB空间中&#xff0c;分为高2G和低2G&#xff0c;高2G是应用程序的&#xff0c;低2G空间是给内…

00后新人“整顿”婚礼 简简单单更实在!

00后新人“整顿”婚礼简简单单更实在!婚礼当天,宾客们刚坐下,新郎新娘就手拉手走上台。新郎咧嘴一笑:“感谢各位来捧场,我俩今天正式领证了!”新娘接茬:“废话不多说,大家吃好喝好,菜不够再加,吃不完打包带走!”台下瞬间爆发出欢呼声,这场婚礼从开始到宣布开席,总…

德约科维奇vs诺里 法网百胜里程碑

北京时间6月2日,法网男单第四轮比赛中,赛会六号种子德约科维奇以6-2、6-3、6-2的比分击败诺里,成功晋级八强。首盘开始,德约连保带破取得2-0领先。尽管诺里随后回破,但德约在第四局再次破发,掌控了比赛节奏。最终在诺里的关键发球局中,德约再次破发,以6-2赢得首盘。第二…

地磁暴又来了!这些地方受影响 我国北部或现极光

中国气象局国家空间天气监测预警中心报告称,北京时间5月31日太阳爆发了耀斑,地球可能连续三天发生地磁暴,我国北部有机会出现较为明显的极光。具体来说,5月31日7点45分左右,太阳活动区14100开始爆发耀斑,软X射线流量快速上升,8点05分达到峰值——M8.1级中等耀斑强度。伴…

北京今天晴朗伴大风 阵风可达6至7级 明起炎热升级 气温将明显升高

今天6月3日,北京天气以晴朗为主,北风加大,阵风可达6至7级。未来两天,北京将继续保持晴朗,气温显著升高。昨天,北京大部分地区晴朗,北部和西部出现分散性阵雨或雷阵雨,但雨量不大。多地北风强劲,阵风达到6至7级。据北京市气象台预计,今天白天晴间多云,北风从2级左右逐…

全都要!德天空:新月将和胜利争夺C罗,同时致力于签下B费 沙特豪门雄心勃勃

据德国天空体育报道,利雅得新月在与利雅得胜利争夺C罗的同时,还致力于签下布鲁诺-费尔南德斯。C罗与利雅得胜利的合同将于6月底到期。尽管利雅得胜利的体育总监耶罗极力挽留,但利雅得新月希望签下这名40岁的老将,并带他参加即将举行的世俱杯(6月14日至7月13日)。利雅得新…

[HTML5]快速掌握canvas

背景 canvas 是 html5 标准中提供的一个标签, 顾名思义是定义在浏览器上的画布 通过其强大的绘图接口&#xff0c;我们可以实现各种各样的图形&#xff0c;炫酷的动画&#xff0c;甚至可以利用他开发小游戏&#xff0c;包括市面上很流行的数据可视化框架底层都用到了Canvas。…

Mininconda3安装使用

一、简介 Anaconda和Miniconda都是非常流行的Python发行版&#xff0c;它们都提供了强大的包管理系统和环境管理系统&#xff0c;让Python编程变得超级简单。 但Miniconda是Anaconda的精简版&#xff0c;只包含最基本的conda包管理器和Python环境管理器&#xff0c;不像Anaco…

设备驱动与文件系统:03 生磁盘的使用

磁盘驱动学习开篇 从这一讲开始&#xff0c;我们进入设备驱动的学习&#xff0c;具体聚焦于设备管理的最后一个部分——磁盘管理。磁盘管理实践也是操作系统课程的最后一块内容。磁盘的驱动器本质上仍是一种设备驱动&#xff0c;其原理不变&#xff0c;核心依旧是文件视图、磁…

【MATLAB代码】制导——平行接近法,三维,目标是运动的,订阅专栏后可直接查看MATLAB源代码

文章目录 运行结果简介代码功能概述运行结果核心模块解析代码特性与优势 MATLAB例程代码调整说明相关公式视线角速率约束相对运动学方程导引律加速度指令运动学更新方程拦截条件判定 运行结果 运行演示视频&#xff1a; 三维平行接近法导引运行演示 简介 代码功能概述 本代码…

spdlog介绍与使用

文章目录 spdlog的介绍与安装使用样例二次封装 spdlog的介绍与安装 spdlog 是一个高性能、超快速、零配置的 C 日志库&#xff0c;它旨在提供简洁的 API 和丰富的功能&#xff0c;同时保持高性能的日志记录。它支持多种输出目标、格式化选项、线程安全以及异步日志记录。 特点…

w373驾校预约学习系统的设计与实现

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;原创团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文…

【PCB设计】STM32开发板——产品设计流程及元件选型

一、PCB设计流程 二、产品设计流程 三、需求及方案 四、元器件的选型 1.MCU选型 2.STM32单片机命名规则 根据命名规则及我们的需求&#xff0c;最终选择使用STM32F103VET6的芯片。 3.MCU最小系统 MCU最小系统的电路设计可以参考相关的数据手册。 4.信号接口选型 ADC以及DAC一…

守护进程导致程序kill掉后被重新拉起

ps aux | grep "supervisord" 从上面的命令可以查找到守护进程的配置文件位置&#xff1a; /etc/supervisor/supervisord.conf 从配置信息看&#xff0c;守护进程的配置文件&#xff0c;又加载了/etc/supervisor/conf.d/*.conf的所有相关配置信息&#xff1b; cat /…

【iOS安全】使用LLDB调试iOS App | LLDB基本架构 | LLDB安装和配置

LLDB基本架构 参考&#xff1a; https://crifan.github.io/ios_re_dynamic_debug/website/debug_code/lldb_debugserver.html https://book.crifan.org/books/ios_re_debug_debugserver_lldb/website/ LLDB安装和配置 1. 让iPhone中出现/Developer/usr/bin/debugserver 最初…

RPG19.设置敌人

1.启动项目&#xff0c;创建爱你CharacterBase的子类 2.创建敌人数据资产 3.创建敌人的ASC 4.创建敌人的CombatComponent 5.打开EnemyCharacter&#xff0c; // Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "Core…

ROS 2源换源后GPG错误解决方法

报错形式&#xff1a; 解决&#xff1a; 1. 删除旧的 ROS 密钥 sudo rm /etc/apt/trusted.gpg.d/ros.gpg 2. 重新下载并导入新的 ROS GPG 密钥 sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /etc/apt/trusted.gpg.d/ros.gpg 3. 确…

day 43

应用cnn对kaggle上的图像数据集进行练习 数据集地址&#xff1a;Cat and Dog import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pyplot as plt im…