卧蚕眼,良陈美锦,倍儿爽-可爱精神,向不开心开战,开心文章、新闻,为你提供

频道:最近大事件 日期: 浏览:247

完结进程

经过canal解析binlog,canal首要模拟了mysql的Slave向Master发送恳求,当mysql有增修正查时则会动身恳求将数据发送到canal服务中,canal将数据存放到内存,直到客户端程序(canal服务端和客户端程序都是由java编写,且客户端逻辑由咱们凭借com.alibaba.otter.canal东西包下的类完结开发)经过发布-订阅这种形式消费canal服务中的数据。

装备mysql

1.在my.ini文件里的【mysqld】下增加如下:

重启mysql,留意假如是文件名是my-default.ini要改名成my.ini

2.创立mysql用户,并装备canal权限,进入到mysql程序中,输入下列指令,创立用户,(用户名和暗码都是:canal):

CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON数据库名.表名 TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON数据库名.表名 TO 'canal'@'%' ;

FLUSH PRIVILEGES;

下载并装备发动canal:

1.]官网地址:[https://github.com/alibaba/canal/releases,下载后解压

2.装备canal:首要装备的文件有两处,canal/conf/example/instance.properties和 canal/conf/canal.properties .下面临这两处做修正:

cd canal/conf/example/目录下对 instance.properties进行修正:

image.png

cd canal/conf/目录下对 canal.properties 进行修正:

3.发动canal:双击/bin/startup.bat文件发动

Java衔接canal履行同步操作:

1.在maven项目中中加载canal和redis依靠包:



com.alibaba.otter
canal.client
1.0.12


redis.clients
jedis
2.4.2


2.树立canal客户端,从canal中获取数据,并将数据更新至Redis:

package canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
/**
* @Author: daijunchen
* @Date: 2019/3/12 17:30
*/
public class ClientSample {
public static void main(String args[]) {
// 创立链接,hostname位canal服务器ip port位canal服务器端口,username,password可不填
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交承认
// connector.rollback(batchId); // 处理失利, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/**
* 打印改变的数据
*
* @param columns
*/
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
/**
* 数据刺进同步redis
*
* @param columns
*/
private static void redisInsert(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet("user:" + columns.get(0).getValue(), json.toJSONString());
}
}
/**
* 更新同步redis
*
* @param columns
*/
private static void redisUpdate(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet("user:" + columns.get(0).getValue(), json.toJSONString());
}
}
/**
* 数据删去同步redis
*
* @param columns
*/
private static void redisDelete(List columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey("user:" + columns.get(0).getValue());
}
}
}

3.RedisUtil东西类,用来衔接redis:

package canal;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

/**

* @Author: daijunchen

* @Date: 2019/3/12 17:29

*/

public class RedisUtil {

// Redis服务器IP

private static String ADDR = "127.0.0.1";

// Redis的端口号

private static int PORT = 6379;

// 拜访暗码

private static String AUTH = "";

// 可用衔接实例的最大数目,默认值为8;

// 假如赋值为-1,则表明不约束;假如pool现已分配了maxActive个jedis实例,则此刻pool的状况为exhausted(耗尽)。

private static int MAX_ACTIVE = 1024;

// 操控一个pool最多有多少个状况为idle(闲暇的)的jedis实例,默认值也是8。

private static int MAX_IDLE = 200;

// 等候可用衔接的最大时刻,单位毫秒,默认值为-1,表明永不超时。假如超越等候时刻,则直接抛出JedisConnectionException;

private static int MAX_WAIT = 10000;

// 过期时刻

protected static int expireTime = 660 * 660 * 24;

// 衔接池

protected static JedisPool pool;

/**

* 静态代码,只在初度调用一次

*/

static {

JedisPoolConfig config = new JedisPoolConfig();

//最大衔接数

config.setMaxTotal(MAX_ACTIVE);

//最多闲暇实例

config.setMaxIdle(MAX_IDLE);

//超时时刻

config.setMaxWaitMillis(MAX_WAIT);

//

config.setTestOnBorrow(false);

pool = new JedisPool(config, ADDR, PORT, 1000);

}

/**

* 获取jedis实例

*/

protected static synchronized Jedis getJedis() {

Jedis jedis = null;

try {

jedis = pool.getResource();

} catch (Exception e) {

e.printStackTrace();

if (jedis != null) {

pool.returnBrokenResource(jedis);

}

}

return jedis;

}

/**

* 开释jedis资源

*

* @param jedis

* @param isBroken

*/

protected static void closeResource(Jedis jedis, boolean isBroken) {

try {

if (isBroken) {

pool.returnBrokenResource(jedis);

} else {

pool.returnResource(jedis);

}

} catch (Exception e) {

}

}

/**

* 是否存在key

*

* @param key

*/

public static boolean existKey(String key) {

Jedis jedis = null;

boolean isBroken = false;

try {

jedis = getJedis();

jedis.select(0);

return jedis.exists(key);

} catch (Exception e) {

isBroken = true;

} finally {

closeResource(jedis, isBroken);

}

return false;

}

/**

* 删去key

*

* @param key

*/

public static void delKey(String key) {

Jedis jedis = null;

boolean isBroken = false;

try {

jedis = getJedis();

jedis.select(0);

jedis.del(key);

} catch (Exception e) {

isBroken = true;

} finally {

closeResource(jedis, isBroken);

}

}

/**

* 获得key的值

*

* @param key

*/

public static String stringGet(String key) {

Jedis jedis = null;

boolean isBroken = false;

String lastVal = null;

try {

jedis = getJedis();

jedis.select(0);

lastVal = jedis.get(key);

jedis.expire(key, expireTime);

} catch (Exception e) {

isBroken = true;

} finally {

closeResource(jedis, isBroken);

}

return lastVal;

}

/**

* 增加string数据

*

* @param key

* @param value

*/

public static String stringSet(String key, String value) {

Jedis jedis = null;

boolean isBroken = false;

String lastVal = null;

try {

jedis = getJedis();

jedis.select(0);

lastVal = jedis.set(key, value);

jedis.expire(key, expireTime);

} catch (Exception e) {

e.printStackTrace();

isBroken = true;

} finally {

closeResource(jedis, isBroken);

}

return lastVal;

}

/**

* 增加hash数据

*

* @param key

* @param field

* @param value

*/

public static void hashSet(String key, String field, String value) {

boolean isBroken = false;

Jedis jedis = null;

try {

jedis = getJedis();

if (jedis != null) {

jedis.select(0);

jedis.hset(key, field, value);

jedis.expire(key, expireTime);

}

} catch (Exception e) {

isBroken = true;

} finally {

closeResource(jedis, isBroken);

}

}

}

验证修正mysql数据库内容后发现redis也实时更新了:

热门
最新
推荐
标签