Kafka

安装kafka

1.tar -zxvf .

2.进入到config目录下修改properties

broker.id

listeners=PLAINTEXT://192.168.11.140:9092

zookeeper.connect

3.启动

sh kafka-server-start.sh -daemon ../config/server.properties

sh kafka-server-stop.sh

 

 

zookeeper上注册的节点信息

cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config

 

controller – 控制节点

brokers  – kafka集群的broker信息 。 topic

consumer  ids/owners/offsets

基本操作

http://kafka.apache.org/documentation/#quickstart

 

kafka的实现细节

消息

消息是kafka中最基本的数据单元。消息由一串字节构成,其中主要由key和value构成,key和value也都是byte数组。key的主要作用是根据一定的策略,将消息路由到指定的分区中,这样就可以保证包含同一key的消息全部写入到同一个分区中,key可以是null。为了提高网络的存储和利用率,生产者会批量发送消息到kafka,并在发送之前对消息进行压缩

 

topic&partition

Topic是用于存储消息的逻辑概念,可以看作一个消息集合。每个topic可以有多个生产者向其推送消息,也可以有任意多个消费者消费其中的消息

每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的;

 

 

Partition是以文件的形式存储在文件系统中,存储在kafka-log目录下,命名规则是:<topic_name>-<partition_id>

 

kafka的高吞吐量的因素

  1. 顺序写的方式存储数据 ;
  2. 批量发送;在异步发送模式中。kafka允许进行批量发送,也就是先讲消息缓存到内存中,然后一次请求批量发送出去。这样减少了磁盘频繁io以及网络IO造成的性能瓶颈

batch.size 每批次发送的数据大小

linger.ms  间隔时间

  1. 零拷贝

消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不懂的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤

▪ 操作系统将数据从磁盘读入到内核空间的页缓存

▪ 应用程序将数据从内核空间读入到用户空间缓存中

▪ 应用程序将数据写回到内核空间到socket缓存中

▪ 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

 

通过“零拷贝”技术可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数

日志策略

日志保留策略

无论消费者是否已经消费了消息,kafka都会一直保存这些消息,但并不会像数据库那样长期保存。为了避免磁盘被占满,kafka会配置响应的保留策略(retention policy),以实现周期性地删除陈旧的消息

kafka有两种“保留策略”:

  1. 根据消息保留的时间,当消息在kafka中保存的时间超过了指定时间,就可以被删除;
  2. 根据topic存储的数据大小,当topic所占的日志文件大小大于一个阀值,则可以开始删除最旧的消息

日志压缩策略

在很多场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。我们可以开启日志压缩功能,kafka定期将相同key的消息进行合并,只保留最新的value值

消息可靠性机制

消息发送可靠性

生产者发送消息到broker,有三种确认方式(request.required.acks)

acks = 0: producer不会等待broker(leader)发送ack 。因为发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步),既有可能丢失也可能会重发。

acks = 1: 当leader接收到消息之后发送ack,丢会重发,丢的概率很小

acks = -1: 当所有的follower都同步消息成功后发送ack.  丢失消息可能性比较低。

消息存储可靠性

每一条消息被发送到broker中,会根据partition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。

在创建topic时可以指定这个topic对应的partition的数量。在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断这个消息发送到哪个partition。

kafka的高可靠性的保障来自于另一个叫副本(replication)策略,通过设置副本的相关参数,可以使kafka在性能和可靠性之间做不同的切换。

 

高可靠性的副本

sh kafka-topics.sh –create –zookeeper 192.168.11.140:2181 –replication-factor 2 –partitions 3 –topic sixsix

–replication-factor表示的副本数

副本机制

ISR(副本同步队列)

维护的是有资格的follower节点

  1. 副本的所有节点都必须要和zookeeper保持连接状态
  2. 副本的最后一条消息的offset和leader副本的最后一条消息的offset之间的差值不能超过指定的阀值,这个阀值是可以设置的(lag.max.messages)

HW&LEO

关于follower副本同步的过程中,还有两个关键的概念,HW(HighWatermark)和LEO(Log End Offset). 这两个参数跟ISR集合紧密关联。HW标记了一个特殊的offset,当消费者处理消息的时候,只能拉去到HW之前的消息,HW之后的消息对消费者来说是不可见的。也就是说,取partition对应ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。每个replica都有HW,leader和follower各自维护更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步更新HW,此时消息才能被consumer消费。这样就保证了如果leader副本损坏,该消息仍然可以从新选举的leader中获取

LEO 是所有副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向leader副本追加消息的时候,leader副本的LEO标记就会递增;当follower副本成功从leader副本拉去消息并更新到本地的时候,follower副本的LEO就会增加

 

分布式事务

分布式事务产生背景

[database transaction]

 

数据库事务要满足几个要求:ACID

Atomic(原子性)     事务必须是原子的工作单元

Consistent(一致性)  事务完成时,必须使所有数据都保持一致状态

Isolation(隔离性)    并发事务所做的修改必须和其他事务所做的修改是隔离的

Duration(持久性) 事务完成之后,对系统的影响是永久性的

 

Mysql里的事务处理过程

  1. 记录redo和undo log文件,确保日志在磁盘上的持久化
  2. 更新数据记录
  3. 提交事务 ,redo 写入commit记录

Undo + Redo事务的简化过程

  假设有A、B两个数据,值分别为1,2,开始一个事务,事务的操作内容为:把1修改为3,2修改为4,那么实际的记录如下(简化):
  A.事务开始.
B.记录A=1到undo log.
C.修改A=3.
D.记录A=3到redo log.
E.记录B=2到undo log.
F.修改B=4.
G.记录B=4到redo log.
H.将redo log写入磁盘。
  I.事务提交

分布式事务

数据库分库分表

SOA化

X/OpenDTP事务模型

X/Open Distributed Transaction Processing Reference Model

X/Open是一个组织机构,定义出的一套分布式事务标准, 定义了规范的API接口

 

2PC(two -phase-commit), 用来保证分布式事务的完整性

J2EE 遵循了X/open DTP规范,设计并实现了java里面的分布式事务编程接口规范-JTA

XA是X/Open DTP定义的中间件与数据库之间的接口规范。 XA接口函数由数据库厂商提供

 

X/OpenDTP 角色

AP application  具体的应用 比如上图的库存中心、订单中心

RM resouces manager   资源管理器。 数据库

TM transaction manager  事务管理器,事务协调者

2PC(two -phase-commit)

(CAP:CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼)

阶段一:提交事务请求(投票)

1.TM向所有的AP发送事务内容,询问是否可以执行事务的提交操作,并等待各个AP的响应

2.执行事务

各个AP节点执行事务操作,将undo和redo信息记录到事务日志中,尽量把提交过程中所消耗时间的操作和准备都提前完成后确保后续

事务提交的成功率

3.各个AP向TM反馈事务询问的响应

各个AP成功执行了事务操作,那么反馈给TM yes的response;如果AP没有成功执行事务,就反馈TM no的response

阶段二:执行事务提交

执行提交事务

假设一个事务的提交过程总共需要30s, 其中prepare操作需要28(事务日志落地磁盘及各种io操作),而真正commit只需要2s

那么,commit阶段发生错误的概率和prepare相比, 2/28 (<10%) .只要第一个阶段成功,那么commit阶段出现失败的概率就非常小

大大增加了分布式事务的成功概率

中断事务提交

2pc存在的问题

  1. 数据一致性问题
  2. 同步阻塞

3PC(three phase commit)

阶段一:canCommit      询问是否可以提交

阶段二:preCommit      进行预提交,类似于2pc中的预提交

阶段三:doCommit        提交

 

改进点

  1. 增加了超时机制
  2. 第二阶段,如果协调者超时没有接受到参与者的反馈,则自动认为失败,发送abort命令
  3. 第三阶段,如果参与者超时没有接受到协调者的反馈,则自动认为成功开始提交事务(基于概率)

3pc的问题

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

分布式事务的实现

JOTM(java open transaction manager)

分布式通信框架RMI

什么是RPC

Remote procedure call protocal

 

RPC协议其实是一个规范。Dubbo、Thrif、RMI、Webservice、Hessain

 

网络协议和网络IO对于调用端和服务端来说是透明;

 

一个RPC框架包含的要素

RMI的概述

RMI(remote method invocation)  , 可以认为是RPC的java版本

 

RMI使用的是JRMP(Java Remote Messageing Protocol), JRMP是专门为java定制的通信协议,所以踏实纯java的分布式解决方案

 

如何实现一个RMI程序

  1. 创建远程接口, 并且继承rmi.Remote接口
  2. 实现远程接口,并且继承:UnicastRemoteObject
  3. 创建服务器程序: createRegistry方法注册远程对象
  4. 创建客户端程序

 

自己实现RMI

  1. 编写服务器程序,暴露一个监听, 可以使用socket
  2. 编写客户端程序,通过ip和端口连接到指定的服务器,并且将数据做封装(序列化)
  3. 服务器端收到请求,先反序列化。再进行业务逻辑处理。把返回结果序列化返回

 

源码分析

为什么不用RMI:

因为是BIO,并且不能支持多语言,基于JDK的无法做负载,序列化使用Java的序列化

HTTP协议

HTTP协议的概述

1.客户端和服务器端

2.资源

html/文本、word、avi电影、其他资源

3.媒体类型

MIME类型。  text/html、 image/jpeg

4.URI和URL

URI:web服务器资源的名字。  index.html

http://www.buaahy.com:80/java/index.html[?query-string] #location

schema: http/https/ftp.

host: web服务器的ip地址或者域名

port: 服务端端口, http默认访问的端口是80

path: 资源访问路径

query-string: 查询参数

5.方法

GET/PUT/DELETE/POST/HEAD

报文

request参数、 response响应参数

request消息结构包含三部分: (起始行、首部字段、主体)

 

METHOD /path / http/version-number

Header-Name:value

空行

主体 optional request body

 

 

response

http/version-number   status code message
header-name:value

空行

body

 

状态码

http/1.1版本的协议里面定义了五种类型的状态码

1XX    提示信息

2XX    成功

3XX    重定向

4XX    客户端错误

5XX    服务器端的错误

 

HTTP协议的特点

1.无状态

cookie+session

2.多次请求

3.基于TCP协议

HTTPS

https由两部分组成,即:HTTP+SSL/TLS

数字证书里面包含的内容:公司信息、网站信息、数字证书的算法、公钥

 

连接过程

总结:

1.客户端发起一个https请求,请求里面包含一个随机数

a.客户端生成一个随机数(第一个随机数)

2.服务端收到请求后,拿到随机数,传输证书和自己生成一个随机数返回

a.证书(颁发机构(CA)、证书内容本身的数字签名(使用第三方机构的私钥加密)、证书持有者的公钥、证书签名用到的hash算法)

b.生成一个随机数,返回给客户端(第二个随机数)

3.客户端拿到证书以后和本地的证书做验证

a.根据颁发机构找到本地的根证书

b.根据CA得到根证书的公钥,通过公钥对数字签名解密,得到证书的内容摘要 A

c.用证书提供的算法对证书内容进行摘要,得到摘要 B

d.通过A和B的对比,也就是验证数字签名

4.验证通过以后,生成一个随机数(第三个随机数),通过证书内的公钥对这个随机数加密,发送给服务器端

5.(随机数1+2+3)通过对称加密得到一个密钥。(会话密钥)

6.以后通过会话密钥对内容进行对称加密传输

RESTful

REST  表述性状态转移

使用WEB标准来做一些准则和约束。

 

RESTful的基本概念

  1. 在REST中,一切的内容都被认为是一种资源
  2. 每个资源都由URI唯一标识
  3. 使用统一的接口处理资源请求(POST/GET/PUT/DELETE/HEAD)
  4. 无状态

 

资源和URI

  1. [/]表示资源的层级关系
  2. ?过滤资源
  3. 使用_或者-让URI的可读性更好

 

统一接口

GET  获取某个资源。 幂等

POST 创建一个新的资源

PUT 替换某个已有的资源(更新操作) , 幂等

DELETE 删除某个资源

PATCH/HEAD

 

资源表述

 

MIME 类型()

accept: text/xml   html文件

Content-Type告诉客户端资源的表述形式

 

RESTful的最佳设计

 

1.域名

http://api.buaahy.com

 

2.版本

http://api.buaahy.com/v1/user/1

header里面维护版本

3.路径

http://api.buaahy.com/v1/goods-list           //商品列表

 

4.过滤信息

https://api.github.com/user/repos?page=2&per_page=100

https://developer.github.com/v3/#rate-limiting

5.状态码

业务状态码

http状态码

函数式设计

Function设计:转换 (Integer -> Integer * 2)

Supplier设计: 没有接受数据,只有返回

Consumer设计:只接收数据,不返回(消费数据)

Predicate设计:判断 (1-10 挑选偶数)

package com.buaahy.stream;

import java.util.Arrays;
import java.util.List;

/**
 * @author hy
 * @date 2018-07-27 16:10
 */
public class Java8 {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        list.stream()
                //Predicate
                .filter(value -> value % 2 == 0)
                //Function
                .map(value -> value * 2)
                //Consumer
                .forEach(System.out::println);   

    }
}

结果:

RabbitMQ

1.概念

消息(Message)是指在应用间传送的数据。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

消息中间件(Message-Oriented Middleware)是指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。(多个系统之间用了通讯的一种方式)

 

2.作用

a.实现异步

为什么不用多线程?多线程增加服务压力,对线程的维护开销较大。

 

b.实现解耦:原来要按顺序调用,现在顺序无关

c.流量削峰:防止短时的请求导致服务崩溃,将请求进行排队处理,如果超出请求队列大小,则抛弃请求跳转到错误页

d.实现广播

 

3.概念

AMQP概念:

AMQP实体模型:

channel:每个消费者和服务建立tcp连接,建立连接和释放连接会消耗大量资源,所以引入channel,建立连接后不进行释放,后续消息传递都在channel上进行。

4.三种主要Exchange(交换机)类型

a.Direct Exchange(直连)

b.Topic Exchange(主题模式)

c.Fanout Exchange(广播)

 

5.细节

a.队列重复声明:如果属性完全相同,再重复声明队列,会直接返回成功,但是修改不同属性会报错

注:队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。

b.消费者acknowledge:如果消费者不返回确认消息,则该消息不会消失,会造成重复消费的问题(消费者一直获取同一个消息进行处理)

c.不满足路由规则的消息:会被直接丢弃

d.消息持久化:如果要做持久化,必须Exchange和Queue都做持久化(durable:true),如果一个是true一个是false,则不允许绑定

e.多个消费者监听一个队列:会造成轮询消费(一个消费者消费一个,另一个消费者消费下一个)

MyBatis

Mybatis中#和$区别

1. #将传入的数据都当成一个字符串,会对自动传入的数据加一个双引号。如:order by #user_id#,如果传入的值是111,那么解析成sql时的值为order by “111”, 如果传入的值是id,则解析成的sql为order by “id”。#取变量对应的值。

2. $将传入的数据直接显示生成在sql中。如:order by $user_id$,如果传入的值是111,那么解析成sql时的值为order by user_id,  如果传入的值是id,则解析成的sql为order by id。$直接取变量的名字。

3. #方式可以防止sql注入,$不行。

MyBatis介绍及使用
1. 认识MyBatis
a) What is MyBatis? MyBatis is a first class persistence framework with support for custom SQL, stored procedures and advanced mappings. MyBatis eliminates almost all of the JDBC code and manual setting of parameters and retrieval of results. MyBatis can use simple XML or Annotations for configuration and map primitives, Map interfaces and Java POJOs (Plain Old Java Objects) to database records.
b) 对比JDBC和MyBatis
2. 使用MyBatis
a) 使用过程
i. 生成bean和配置文件
ii. 配置SqlSessionFactory
iii. 使用(两种方式)
1. Mapper.xml形式
2. Annotation形式

iv. SqlSessionFactory\SqlSession\Mapper 推荐作用域

b) MyBatis Generator使用
i. 配置generator插件

ii. generatorConfig.xml
iii. 执行 mvn mybatis-generator:generate
iv. 生成Bean和Example

3. XML映射配置文件
a) http://www.mybatis.org/mybatis-3/configuration.html
b) Type Handlers
i. 可以override原有的handlers
ii. 可以创造自己的handler补充没实现的类
c) objectFactory
d) plugins
i. 只能针对以下操作做plugin


ii. 步骤
1. 实现 org.apache.ibatis.plugin.Interceptor
2. 定义到底对谁做plugin

3. Plugin方法


4. 传参数

iii. 输出MyBatis日志
e) environments
4. Mapper文件认识
a. namespace 关联到mapper接口,区分类似package的作用
b. resultMap 结果映射
c. sql SQL复用
d. select Insert update delete CRDU操作
e. 动态SQL http://www.mybatis.org/mybatis-3/dynamic-sql.html
5. Batch批量操作
a. 三种方式
性能
for循环 低 每次都要IO
foreach拼SQL(性能最高) 有 SQL长度限制,定好List大小
ExeutorType.BATCH
6. 分页
a. 逻辑分页
org.apache.ibatis.executor.resultset.DefaultResultSetHandler#handleRowValuesForSimpleResultMap 内存分页而已
b. 物理分页
a) limt 0,10
b) 分页面插件 https://github.com/pagehelper/Mybatis-PageHelper
7. 联合查询

 

Tomcat架构

Tomcat架构

一、目录结构

1.conf目录:

catalina.policy:Tomcat安全策略文件,控制JVM相关权限,具体可以参考java.security.Permission

catalina.properties:Tomcat Catalina行为控制配置文件,比如 Common ClassLoader

logging.properties:Tomcat日志配置文件,JDK logging

server.xml:Tomcat Server配置文件

  •     GlobalNamingResources:全局JNDI资源

context.xml:全局Context配置文件

tomcat-users.xml:Tomcat角色配置文件,(Realm文件实现方式)

web.xml:Servlet标准的web.xml部署文件,Tomcat默认实现部分配置入内:

  • org.apache.catalina.servlets.DefaultServlet
  • org.apache.jasper.servlet.JspServlet

2.lib目录

Tomcat存放公用类库

ecj-*.jar:Eclipse Java编译器

jasper.jar:JSP编译器

 

3.logs目录

localhost.${date}.log:当Tomcat应用起不来的时候,多看该文件,比如:类冲突

  • NoClassDefFoundError
  • ClassNotFoundException

catalina.${date}.log:控制台输出,System.out外置(System中有setOut(PrintStream out) 方法控制打印到文件)

 

4.webapps目录

简化web应用的部署方式

 

二、部署Web应用

1.方法一:放置在webapps目录

直接将文件夹拖过去

2.方法二:修改conf/server.xml

在Host下添加Context元素:

<Context docBase=”${webAppAbsolutePath}” path=”/” reloadable=”true” />

可以添加多个,如,再添加一个:

<Context docBase=”${webAppAbsolutePath}” path=”/demo” reloadable=”true” />

 

熟悉配置元素可以参考:org.apache.catalina.core.StandardContext的setter方法

Container

  •     Context

该方法不支持动态部署,建议考虑在生产环境使用。

 

3.方法三:独立Context.xml配置文件

首先注意conf\Catalina\localhost

独立context XML配置文件路径:${TOMCAT_HOME}/conf/Catalina/localhost + ${ContextPath}.xml

注意:该方法可以实现热部署,因此建议在开发环境使用。

 

问题:

1.如果配置path的话 是以文件名为主还是以配置的为主

独立context XML配置文件时,设置path属性是无效的。

2.根独立context XML配置文件路径

${TOMCAT_HOME}/conf/${Engine.name}/${HOST.name} + ROOT.xml

3.如何实现热部署

调整<context>元素中的属性reloadable=”true”

4.连接器里面的线程池是用的哪个线程池

注意conf/server.xml文件中的注释:

org.apache.catalina.Executor:

public interface Executor extends java.util.concurrent.Executor, Lifecycle {
String getName();

void execute(Runnable var1, long var2, TimeUnit var4);
}

 

WordPress安装及迁移教程

1.安装XAMPP,https://www.apachefriends.org/download.html,XAMPP(Apache+MySQL+PHP+PERL)是一个功能强大的建站集成软件包。

2.安装完成后打开软件,依次启动MySQL和Apache,然后点击MySQL的Admin,进入到phpmyadmin,创建数据库密码。

进入到账户,新增用户账户,添加用户名密码等,然后给与所有权限,配置好后,点击右下角的执行按钮。

然后到数据库界面添加数据库,选择数据库编码,创建数据库。

 

3.下载WordPress,https://cn.wordpress.org/download/

减压后放到htdocs目录下,可以改名

4.访问localhost:80/wordpress,出现如下页面,进行安装

根据引导页面连接数据库,添加站点名字等。

5.安装完成后出现登录页面,登录后就可以管理后台了。

6.安装WordPress的导入工具,然后上传文件,因为文件超过2M,修改php.ini的配置文件,重新导入。注意修改后,页面上的最大上传文件大小会变(有时候修改不知道什么原因会不生效,所以注意页面是否变了),修改后重启Apache,重新导入即可。

参考:

windows下安装:https://jingyan.baidu.com/article/1e5468f955d65b484961b7bb.html

导入到vps:https://jingyan.baidu.com/article/2a138328e57dba074b134f63.html

端口占用问题:

 

 

2.因为本地已经安装过MySQL等,所以导致端口占用,查看报错,然后修改对应软件的Config里面的内容,换个端口,然后依次点击Start。

3.因为更改了MySQL的监听端口,所以PHPMySQL的配置也要更改,将$cfg[‘Servers’][$i][‘port’] = ‘33061’ 添加到相应位置(端口为自己MySQL配置的端口)