Kafka消息中间件

article/2025/6/22 13:59:28

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

window中的安装

①、下载并解压kafka压缩包,进入config目录下修改zookeeper.properties配置文件

因为kafka内置了zookeeper,所以不需安装zookeeper。设置zookeeper数据存储位置,如果该路径不存在,则自动创建

dataDir = E:/kafka/data/zk

②、进入bin目录下(linux脚本命令),进入window下属于window命令

cd bin/windows

执行:zookeeper-server-start.bat …/…/config/zookeeper.properties

为了后续的方便启动,可以创建一个zk.cmd文件

文件内容如下:
在这里插入图片描述

③、进入kafka的config目录下,修改server.properties配置文件

log.dirs=E:/kafka/local/data/

④、进入bin/windows目录下启动:kafka-server-start.bat …/…/config/server.properties

为了后续的方便启动,可以创建一个kfk.cmd文件

文件内容如下:
在这里插入图片描述

⑤、查看启动后的进程
在这里插入图片描述

集群规划安装

①、准备三台服务器
在这里插入图片描述
②、在其中一台服务器上操作:
在这里插入图片描述
server.properties主要修改的内容:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

③、通过命令xsync kafka 将其分发到其他服务器,并修改每台服务器的server.properties中broker.id的值
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
④、配置kafka环境变量
在这里插入图片描述
在这里插入图片描述
将xsync脚本分发到其他服务器
在这里插入图片描述

④、启动zookeeper:sk.sh start

启动kafka:
在这里插入图片描述

启动、停止脚本
在这里插入图片描述

#!/bin/bashcase $1 in
"start")for i in hadoop102 hadoop103 hadoop104 doecho "---启动 $i kafka ---"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -ddaemon /opt/module/kafka/config/server.properties" done
;;"stop")for i in hadoop102 hadoop103 hadoop103doecho "---停止 $i kafka ---"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh" done
;;
esac

在这里插入图片描述

命令行操作:

  • 主题 kafka-topic.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first
    (3) --create
    (4) --delete
    (5) --partitions 分区
    (6) --replication-factor 副本

  • 生产者 kafka-console-producer.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first

  • 消费者 kafka-console-consumer.sh
    (1) --bootstrap-server hadoop102:9092,hadoop103:9092 连接服务器
    (2) --topic first

window命令行

主题创建
在这里插入图片描述
如果JDK版本过低,会出现很多日志,所以提高JDK的版本,再创建主题
在这里插入图片描述
可以修改以下文件,重新设置JDK环境变量:
在这里插入图片描述

查看(包括详细查看):
在这里插入图片描述
修改:
在这里插入图片描述

生产者和消费者
在这里插入图片描述

linux命令行
在这里插入图片描述

bin/kafka-topics.sh # 显示所有操作选项bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list #查看当前服务器所有的topic#创建first主题,指定分区,指定副本数
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3

在这里插入图片描述

# 修改分区数,只能增加,不能减少
bin/kafka-topics.sh --bootstrap=server hadoop102:9092 --topic first --alter --partitions 3
# 生产者连接 hadoop102:9092的first主题,生产消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello# 消费者相对应地消费消息
bin/kafla-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstbin/kafla-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning #查看消费所有的历史记录

代码操作

①、创建项目并添加依赖
在这里插入图片描述
在这里插入图片描述

②、生产者

public class KafkaProducerTest{public static void main(String[] args){//创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//对key/value进行序列化configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者KafkaProducer<String,String> producer = new KafkaProducer<>(configMap);//创建数据ProducerRecord<String,String> record = new ProducerRecord<>("test","key","value");//通过生产者对象将数据发送到kafkaproducer.send(record);//关闭生产者对象producer.close();}
}

③、消费者

public class KafkaConsumerTest{public static void main(String[] args){Map<String,Object> consumerConfig = new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_ERVERS_CONFIG,"localhost:9092");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//对key/value进行反序列化consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu")KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));//从kafka的主题中获取数据,参数为超时时间final ConsumerRecords<String,String> datas = consumer.poll(100);for(ConsumerRecord<String,String> data:datas){System.out.println(data);}//关闭消费者对象consumer.close();}
}

Kafka工具

在这里插入图片描述
双击安装之后
在这里插入图片描述
在这里插入图片描述
创建主题
在这里插入图片描述
添加数据
在这里插入图片描述

查看数据
在这里插入图片描述

生产者

在这里插入图片描述
生产者三种发送方式:

  • 异步发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu”+i));
  • 异步回调发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu” + i),new Callback(){}
  • 同步发送 kafkaProducer.send(new ProducerRecord<>(“first”,“atguigu”+i)).get();
public class CustomProducer{public static void main(){//配置Properteis properties = new Properties();//连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(PorducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(PorducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.创建Kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//2.发送数据for(int i=0;i<5;i++){//异步发送kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));	//回调异步发送kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i),new Callback(){@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception){if(exception == null){System.out.println("主题:"+metadata.topic()+"分区:"+metadata.partition());}}});//发送同步数据kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();}//3.关闭资源kafkaProducer.close();}
}

分区

  • 默认分区
  • 自定义分区
//参数中指定分区
kafkaProducer.send(new ProducerRecord<>("first",1,"","atguigu"+i),new Callback(){@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception){if(exception == null){System.out.println("主题:"+metadata.topic()+" 分区: "+metadata.partition());}}
});

在这里插入图片描述

自定义分区器

①、发送过来的数据如果包含atguigu,就发往0号分区;不包含atguigu,就发往1号区

②、分区策略配置

public class MyPartitioner implements Partitioner{@Overridepublic int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,cluster cluster){String msgValues = value.toString();int patition;if(msgValues.contains("atguigu")){partition = 0;}else{partition = 1;}return partition;}@Overridepublic void close(){}@Overridepublic void configure(Map<String,?> configs){}
}

③、生产者

//配置
Properteis properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(PorducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(PorducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//关联自定义分区策略
properties.put(ProducerConfig.APRITIONER_CLASS_CONFIG,"MyPartitioner");KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//消息发送
//........

提供生产者吞吐量

四个提高吞吐量的参数

public class CustomProducerParameters{public static void main(){//配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓存区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.msproperties.put(ProducerConfig.LINGER-MS_CONFIG,1);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//生产者KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);//发送数据for(int i = 0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}//关闭资源kafkaProducer.close();}
}

数据可靠性
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

public class CustomProducerParameters{public static void main(){//配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//acks,数据的可靠性properties.put(ProducerConfig.ACKS_CONFIG,"1");//重复次数properties.put(ProduceConfig.RETRIES_CONFIG,3);//发送数据for(int i = 0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}//关闭资源kafkaProducer.close();}
}

事务
在这里插入图片描述

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERRIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");//指定事务idKafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);kafkaProducer.initTransactions();//初始化
kafkaProducer.beginTransaction();//开始事务try{for(int i = 0;i < 5;i++){kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));kafkaProducer.commitTransaction();//提交事务
}
}catch(Exception e){kafkaProducer.abortTransaction();//终止事务
}finally{kafkaProducer.close();
}

数据乱序
在这里插入图片描述

Kafka_Broker

连接zookeeper工具
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

服役新节点

①、关闭hadoop104(克隆的方式,不需重新安装JDK,Kafka),开启hadoop105(修改IP)
在这里插入图片描述

在这里插入图片描述

②、hadoop105上,修改主机名称hadoop105
在这里插入图片描述
删除克隆后,原先主机的内容
在这里插入图片描述
修改kafka/config目录下的配置文件server.properties内容

  • broker.id=3

③、启动hadoop105的kafka

bin/kafka-server-start.sh -daema config/server.properties

④、执行负载均衡操作

创建一个要均衡的主题(可以对多个主题):vim topics-to-move.json

{"topics":[{"topic":"first"}],"version":1
}

在这里插入图片描述
为4台服务器生成负载均衡计划
在这里插入图片描述

⑤、创建副本存储计划(所有副本存储在broker0,broker1,broker2,broker3)

将第④步的生成计划复制到lication-factor.json文件中
在这里插入图片描述

执行副本存储计划
在这里插入图片描述

验证副本存储计划
在这里插入图片描述

退役旧节点

①、执行负载均衡操作,创建一个要均衡的主题
在这里插入图片描述

②、创建执行计划
在这里插入图片描述
将生成的内容复制到:vim increase-replication-factor.json

③、创建副本存储计划(所有副本存储在broker0 broker1 broker2)
在这里插入图片描述

④、验证副本计划
在这里插入图片描述

副本
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
依照命令创建主题atguigu2,并查看详细情况
在这里插入图片描述
进入对应的服务器中,停用对应的kafka,再查看详细情况
在这里插入图片描述
进入对应的服务器中,恢复对应的kafka,再查看详细情况
在这里插入图片描述

分区副本分配

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产环境中,每台服务器的配置和性能不一致,但是kafka只会根据自己的代码规则创建对应的分区副本
就会导致个别的服务器存储压力较大,所以需要手动调整分区副本的存储

需求:创建一个新的topic,4个分区,两个副本,名称为three。
将该topic所有副本存储到broker0和broker1两台服务器上
在这里插入图片描述
手动调整分区副本存储:
在这里插入图片描述
在这里插入图片描述

增加副本因子

通过命令无法提高副本,需要通过文件计划的方式
在这里插入图片描述

文件存储

在这里插入图片描述

topic数据存储的位置
在这里插入图片描述
在这里插入图片描述

文件清理策略
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

public class CustomerConsumer{public static void main(String[] args){//配置Properties properteis = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//连接properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建一个消费者KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);//订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}
}

在这里插入图片描述

public class CustomerConsumer{public static void main(String[] args){//配置Properties properteis = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//连接properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建一个消费者KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);//订阅主题对应的分区ArrayList<String> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));//指定分区发送数据kafkaConsumer.subscribe(topicPartitions);//消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}
}

分区的分配以及再平衡

  • Range以及再平衡
  • RoundRobin以及再平衡
  • Sticky以及再平衡

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

自动提交offset
在这里插入图片描述

//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

手动提交offset
在这里插入图片描述

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//.......while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}//手动提交offsetkafkaConsumer.commitSync();//同步提交//kafkaConsumer.commitAsync(); 异步提交
}

指定offset消费
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

按照时间进行消费
在这里插入图片描述

重复消费:已经消费的数据,但是offset没提交
漏消费:先提交offset后消费,有可能会造成数据的漏消费
在这里插入图片描述

消费者事务

  • 生产端——》集群
  • 集群——》消费者
  • 消费者——》下游的框架

数据积压(消费者如何提高吞吐量)

  • 增加分区,增加消费者个数
  • 生产=》集群 4个参数

在这里插入图片描述

Kafka-Eagle框架监控

监控kafka集群

①、准备MySQL
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

②、Kafka环境准备
在这里插入图片描述

③、Kafka-Eagle安装

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
访问用户名admin 密码123456
在这里插入图片描述

Kafka-Kafka模式

取代Zookeeper

在这里插入图片描述
在这里插入图片描述

集群部署
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka-Kafka模式集群启动和停止脚本

在这里插入图片描述


http://www.hkcw.cn/article/lQtZNivdqu.shtml

相关文章

4.2.4 Spark SQL 数据写入模式

在本节实战中&#xff0c;我们详细探讨了Spark SQL中数据写入的四种模式&#xff1a;ErrorIfExists、Append、Overwrite和Ignore。通过具体案例&#xff0c;我们演示了如何使用mode()方法结合SaveMode枚举类来控制数据写入行为。我们首先读取了一个JSON文件生成DataFrame&#…

day23-计算机网络-1

1. 网络简介 1.1. 网络介质 网线&#xff1a;cat5,cat5e 六类网线&#xff0c;七类网线&#xff0c;芭蕾网线光纤&#xff1a;wifi&#xff1a;无线路由器&#xff0c;ap5G 1.2. 常见网线类型 1.2.1. 双绞线&#xff08;Twisted Pair Cable&#xff09;【最常用】 按性能主…

Ubuntu下编译mininim游戏全攻略

目录 一、安装mininim 软件所依赖的库&#xff08;重点是allegro游戏引擎库&#xff09;二、编译mininim 软件三、将mininim打包给另一个Ubuntu系统使用四、安卓手机运行mininim 一、安装mininim 软件所依赖的库&#xff08;重点是allegro游戏引擎库&#xff09; 1. 用apt-get…

org.junit.runners.model.InvalidTestClassError:此类问题的解决

不知道大家是否遇见过以上这种情况&#xff0c;我也是今天被这个错误搞得很烦&#xff0c;后来通过网上查找资料终于找到了问题所在————就是简单的Test注解的错误使用 Test注解的注意情况 &#xff1a;1 权限必须是public 2 不能有参数 3 返回值类型是void 4 本类的其他的…

2025年渗透测试面试题总结-匿名[校招]渗透测试(打击黑灰产)(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 匿名[校招]渗透测试(打击黑灰产) 2. 实习时达成的目标 3. 文件包含漏洞 4. Redis未授权访问利用 5. 钓鱼…

【Hot 100】55. 跳跃游戏

目录 引言跳跃游戏我的解题 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;算法专栏&#x1f4a5; 标题&#xff1a;【Hot 100】55. 跳跃游戏❣️ 寄语&#xff1a;书到用时方恨少&#xff0c;事非经过不知难&#xff01; 引言 跳跃游戏 &#x…

Go 语言的 GC 垃圾回收

序言 垃圾回收&#xff08;Garbage Collection&#xff0c;简称 GC&#xff09;机制 是一种自动内存管理技术&#xff0c;主要用于在程序运行时自动识别并释放不再使用的内存空间&#xff0c;防止内存泄漏和不必要的资源浪费。这篇文章让我们来看一下 Go 语言的垃圾回收机制是如…

qwen 2.5 并行计算机制:依靠 PyTorch 和 Transformers 库的分布式能力

qwen 2.5 并行计算机制:依靠 PyTorch 和 Transformers 库的分布式能力 完整可运行代码: import torch import torch.nn.functional as F from transformers

如何评估CAN总线信号质量

CAN总线网络的性能在很大程度上取决于其信号质量。信号质量差可能导致通信错误&#xff0c;进而引发系统故障、效率降低甚至安全隐患。因此&#xff0c;评估和确保CAN总线信号质量是维护系统健康和可靠性的关键。 在CAN总线网络中&#xff0c;数据通过双绞线上的差分信号传输。…

第三方软件评测机构如何助力软件品质提升及企业发展?

第三方软件评测机构与软件开发者及使用者无直接关联&#xff0c;它们提供全方位的检测和公正的评价服务。这样的评测可以展现客观的成效&#xff0c;对提升软件的品质具有显著影响&#xff0c;且在软件产业中发挥着至关重要的角色。 评测的客观性 独立第三方机构与软件开发者…

Linux之MySQL安装篇

1.确保Yum环境是否能正常使用 使用yum环境进行软件的安装 yum -y install mysql-server mysql2.确保软件包已正常完成安装 3.设置防火墙和selinux配置 ## 关闭防火墙 systemctl stop firewalld## 修该selinux配置 vim /etc/selinux/config 将seliuxenforcing修改为sel…

Java 项目架构设计:模块化、分层架构的实战经验

Java 项目架构设计&#xff1a;模块化、分层架构的实战经验 在当今复杂多变的软件开发领域&#xff0c;Java 项目架构设计起着至关重要的作用。良好的架构设计不仅能够提升项目的可维护性、可扩展性&#xff0c;还能有效降低系统的耦合度&#xff0c;提高开发效率。而模块化与…

uniapp 键盘顶起页面问题

关于uniapp中键盘顶起页面的问题。这是一个在移动应用开发中常见的问题&#xff0c;特别是当输入框位于页面底部时&#xff0c;键盘弹出会顶起整个页面&#xff0c;导致页面布局错乱。 pages.json 文件内&#xff0c;在需要处理软键盘的页面添加 softinputMode 配置&#xff1…

截面动量策略思路

该策略旨在实现期货日频多品种交易&#xff0c;采用MA双均线结合百分比追踪止损的方法。策略建议初始资金为1000000元&#xff0c;并基于2012年1月1日至今的数据进行回测。策略的核心逻辑包括主力合约的动态切换、双均线交叉信号的生成以及基于百分比的追踪止损机制。 交易逻辑…

HCIE-STP复习

文章目录 STP STP &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Datacom专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年05月31日13点17STP通过三要素选举消除环路&#xff1a; 根桥&#xff08;BID最小&#xff0c;建议设优先级为0&…

Git入门到精通:30分钟掌握核心技巧

目录 一、基础理论片 Git简介 Git安装 Git仓库 Git基本命令用法 仓库别名 二、实操命令篇 远程分支 分支的新建和合并 实操演示 1 本地新建仓库 2 gitee新建仓库 3 建立关系 4 新建分支 5 开发新功能 6 推送新分支 7 合并新分支到主分支 三、可视化工具篇 G…

告别压降损耗与反向电流困扰:汽车电子电源防反接方案全面解析与理想二极管应用

在汽车电子系统中&#xff0c;由于电源反接、快速负脉冲群、微关断、叠加交流等防护要求&#xff0c;需要设计防反电路。常见电路中&#xff0c;依赖肖特基二极管实现电池反接保护和电源冗余&#xff08;ORing&#xff09;设计。然而&#xff0c;随着功率密度和效率要求飙升&am…

5.1 初探大数据流式处理

在本节中&#xff0c;我们深入探讨了大数据流式处理的基础知识和关键技术。首先&#xff0c;我们区分了批式处理和流式处理两种大数据处理方式&#xff0c;了解了它们各自的适用场景和特点。流式处理以其低延迟和高实时性适用于需要快速响应的场景&#xff0c;而批式处理则适用…

线程概念与控制

目录 Linux线程概念 什么是线程 分页式存储管理 虚拟地址和页表的由来 物理内存管理 页表 提问 解答 缺页异常 线程的优点 线程的缺点 线程异常 Linux进程VS线程 进程与线程 进程的多个线程共享 进程与线程关系如图 Linux线程控制 POSIX线程库 创建线程 测试…

SAR ADC 同步逻辑设计

SAR ADC的逻辑是重要的一个模块&#xff0c;可以分为同步逻辑和异步逻辑&#xff0c;对于低速SAR ADC&#xff0c;一般采用同步逻辑&#xff0c;对于高速SAR ADC&#xff0c;一般采用异步逻辑。 对于同步逻辑&#xff0c;由于架构不同&#xff0c;有先置位再比较&#xff0c;也…