应用场景
- 为什么要用redis?
二进制存储、java序列化传输、IO连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口;
其代码如下:
1 package Utils;2 import java.io.*;3 /**4 * Created by Kinglf on 2016/10/17.5 */6 public class ObjectUtil {7 /**8 * 对象转byte[]9 * @param obj 10 * @return 11 * @throws IOException 12 */ 13 public static byte[] object2Bytes(Object obj) throws IOException{ 14 ByteArrayOutputStream bo=new ByteArrayOutputStream(); 15 ObjectOutputStream oo=new ObjectOutputStream(bo); 16 oo.writeObject(obj); 17 byte[] bytes=bo.toByteArray(); 18 bo.close(); 19 oo.close(); 20 return bytes; 21 } 22 /** 23 * byte[]转对象 24 * @param bytes 25 * @return 26 * @throws Exception 27 */ 28 public static Object bytes2Object(byte[] bytes) throws Exception{ 29 ByteArrayInputStream in=new ByteArrayInputStream(bytes); 30 ObjectInputStream sIn=new ObjectInputStream(in); 31 return sIn.readObject(); 32 } 33 }
二、消息类(实现Serializable接口)
package Model;import java.io.Serializable;/*** Created by Kinglf on 2016/10/17.*/ public class Message implements Serializable {private static final long serialVersionUID = -389326121047047723L;private int id;private String content;public Message(int id, String content) {this.id = id;this.content = content;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }
三、Redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点: 只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则
Redis中lpush头入
(rpop尾出
)或rpush尾入
(lpop头出
)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可 java采用Jedis进行Redis的存储和Redis的连接池设置
上代码:
package Utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.List; import java.util.Map; import java.util.Set; /*** Created by Kinglf on 2016/10/17.*/ public class JedisUtil {private static String JEDIS_IP;private static int JEDIS_PORT;private static String JEDIS_PASSWORD;private static JedisPool jedisPool;static {//Configuration自行写的配置文件解析类,继承自PropertiesConfiguration conf=Configuration.getInstance();JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");JEDIS_PORT=conf.getInt("jedis.port",6379);JEDIS_PASSWORD=conf.getString("jedis.password",null);JedisPoolConfig config=new JedisPoolConfig();config.setMaxActive(5000);config.setMaxIdle(256);config.setMaxWait(5000L);config.setTestOnBorrow(true);config.setTestOnReturn(true);config.setTestWhileIdle(true);config.setMinEvictableIdleTimeMillis(60000L);config.setTimeBetweenEvictionRunsMillis(3000L);config.setNumTestsPerEvictionRun(-1);jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);}/*** 获取数据* @param key* @return*/public static String get(String key){String value=null;Jedis jedis=null;try{jedis=jedisPool.getResource();value=jedis.get(key);}catch (Exception e){jedisPool.returnBrokenResource(jedis);e.printStackTrace();}finally {close(jedis);}return value;}private static void close(Jedis jedis) {try{jedisPool.returnResource(jedis);}catch (Exception e){if(jedis.isConnected()){jedis.quit();jedis.disconnect();}}}public static byte[] get(byte[] key){byte[] value = null;Jedis jedis = null;try {jedis = jedisPool.getResource();value = jedis.get(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return value;}public static void set(byte[] key, byte[] value) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.set(key, value);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static void set(byte[] key, byte[] value, int time) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.set(key, value);jedis.expire(key, time);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static void hset(byte[] key, byte[] field, byte[] value) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.hset(key, field, value);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static void hset(String key, String field, String value) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.hset(key, field, value);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}/*** 获取数据** @param key* @return*/public static String hget(String key, String field) {String value = null;Jedis jedis = null;try {jedis = jedisPool.getResource();value = jedis.hget(key, field);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return value;}/*** 获取数据** @param key* @return*/public static byte[] hget(byte[] key, byte[] field) {byte[] value = null;Jedis jedis = null;try {jedis = jedisPool.getResource();value = jedis.hget(key, field);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return value;}public static void hdel(byte[] key, byte[] field) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.hdel(key, field);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}/*** 存储REDIS队列 顺序存储* @param key reids键名* @param value 键值*/public static void lpush(byte[] key, byte[] value) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.lpush(key, value);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}/*** 存储REDIS队列 反向存储* @param key reids键名* @param value 键值*/public static void rpush(byte[] key, byte[] value) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.rpush(key, value);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}/*** 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端* @param key reids键名* @param destination 键值*/public static void rpoplpush(byte[] key, byte[] destination) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.rpoplpush(key, destination);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}/*** 获取队列数据* @param key 键名* @return*/public static List lpopList(byte[] key) {List list = null;Jedis jedis = null;try {jedis = jedisPool.getResource();list = jedis.lrange(key, 0, -1);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return list;}/*** 获取队列数据* @param key 键名* @return*/public static byte[] rpop(byte[] key) {byte[] bytes = null;Jedis jedis = null;try {jedis = jedisPool.getResource();bytes = jedis.rpop(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return bytes;}public static void hmset(Object key, Map hash) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.hmset(key.toString(), hash);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static void hmset(Object key, Map hash, int time) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.hmset(key.toString(), hash);jedis.expire(key.toString(), time);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static List hmget(Object key, String... fields) {List result = null;Jedis jedis = null;try {jedis = jedisPool.getResource();result = jedis.hmget(key.toString(), fields);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return result;}public static Set hkeys(String key) {Set result = null;Jedis jedis = null;try {jedis = jedisPool.getResource();result = jedis.hkeys(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return result;}public static List lrange(byte[] key, int from, int to) {List result = null;Jedis jedis = null;try {jedis = jedisPool.getResource();result = jedis.lrange(key, from, to);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return result;}public static Map hgetAll(byte[] key) {Map result = null;Jedis jedis = null;try {jedis = jedisPool.getResource();result = jedis.hgetAll(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return result;}public static void del(byte[] key) {Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.del(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}}public static long llen(byte[] key) {long len = 0;Jedis jedis = null;try {jedis = jedisPool.getResource();jedis.llen(key);} catch (Exception e) {//释放redis对象jedisPool.returnBrokenResource(jedis);e.printStackTrace();} finally {//返还到连接池close(jedis);}return len;} }
四、Configuration主要用于读取Redis的配置信息
package Utils;import java.io.IOException; import java.io.InputStream; import java.util.Properties;/*** Created by Kinglf on 2016/10/17.*/ public class Configuration extends Properties {private static final long serialVersionUID = -2296275030489943706L;private static Configuration instance = null;public static synchronized Configuration getInstance() {if (instance == null) {instance = new Configuration();}return instance;}public String getProperty(String key, String defaultValue) {String val = getProperty(key);return (val == null || val.isEmpty()) ? defaultValue : val;}public String getString(String name, String defaultValue) {return this.getProperty(name, defaultValue);}public int getInt(String name, int defaultValue) {String val = this.getProperty(name);return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);}public long getLong(String name, long defaultValue) {String val = this.getProperty(name);return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);}public float getFloat(String name, float defaultValue) {String val = this.getProperty(name);return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);}public double getDouble(String name, double defaultValue) {String val = this.getProperty(name);return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);}public byte getByte(String name, byte defaultValue) {String val = this.getProperty(name);return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);}public Configuration() {InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");try {this.loadFromXML(in);in.close();} catch (IOException ioe) {}} }
五、测试
import Model.Message; import Utils.JedisUtil; import Utils.ObjectUtil; import redis.clients.jedis.Jedis;import java.io.IOException;/*** Created by Kinglf on 2016/10/17.*/ public class TestRedisQueue {public static byte[] redisKey = "key".getBytes();static {try {init();} catch (IOException e) {e.printStackTrace();}}private static void init() throws IOException {for (int i = 0; i < 1000000; i++) {Message message = new Message(i, "这是第" + i + "个内容");JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));}}public static void main(String[] args) {try {pop();} catch (Exception e) {e.printStackTrace();}}private static void pop() throws Exception {byte[] bytes = JedisUtil.rpop(redisKey);Message msg = (Message) ObjectUtil.bytes2Object(bytes);if (msg != null) {System.out.println(msg.getId() + "----" + msg.getContent());}} }
每执行一次pop()方法,结果如下:
<br>1----这是第1个内容
<br>2----这是第2个内容
<br>3----这是第3个内容
<br>4----这是第4个内容
总结
至此,整个Redis消息队列的生产者和消费者代码已经完成
- Message
需要传送的实体类(需实现Serializable接口)
- Configuration
Redis的配置读取类,继承自Properties
- ObjectUtil
将对象和byte数组双向转换的工具类
- Jedis
通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类