当前位置: 首页 > news >正文

vue做单页面网站3322免费域名注册

vue做单页面网站,3322免费域名注册,上海做网站cnsosu,农业信息网站建设概念导读在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库…

导读

在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库查询时间很长,那有可能会阻塞流计算的整体流程。因此可以考虑异步的方式请求数据库,当数据返回时,该事件再继续执行下面的操作。这样提升了流计算的并发度,但是也增加了数据库的访问以及网络带宽的压力。

1 Flink中的异步IO

在Flink中提供了一种异步IO的模式,不需要使用map函数阻塞式的加载数据,而是使用异步方法同时处理大量请求。不过这就需要数据库支持异步请求,如果不支持异步请求也可以手动维护线程池调用,只不过效率上没有原生的异步client更高效。比如Mysql可以通过Vertx支持异步查询,HBase2.x也支持异步查询。

一般要实现Flink的异步查询需要自定义几个方法:

class MyAsyncReq extends RichAsyncFunction{

@Override

public void open(..) throws Exception {}

@Override

public void close() throws Exception {}

@Override

public void asyncInvoke(..) throws Exception {}

}

其中open中需要定义连接或者连接池,close中进行释放,asyncInvoke执行异步查询。

AsyncDataStream.unorderedWait(

stream,

new MyAsyncReq(),

1000,

TimeUnit.MILLISECONDS,

100);

使用的使用执行下面的方法即可,

stream为主要的事件流,

myasyncreq是异步IO类,

1000为异步请求的超时时间,

100是同时进行异步请求的最大数量

另外,由于是异步请求,所以可能请求结束后顺序与原来的顺序就不一致了。使用unordered时会以异步请求结束的时间为准,ordered会以事件时间为准。

2 基于Vertx实现的Mysql异步IO

如果外部数据源是Mysql,一般的jdbc连接都是同步机制的,看浪尖大大的文章,推荐了一个异步JDBC组件——Vertx,下面就以Vertx为例作为异步IO的Client。

maven引入除flink之外其他的jar:

mysql

mysql-connector-java

8.0.13

io.vertx

vertx-jdbc-client

3.8.3

io.vertx

vertx-core

3.8.3

先在open中创建SQLClient,它内部维护了自己的异步请求服务;然后再close中关闭client;在asyncInvoke中调用获取connection,执行查询,并释放连接。

public class JDBCAsyncFunction extends RichAsyncFunction {

private SQLClient client;

@Override

public void open(Configuration parameters) throws Exception {

Vertx vertx = Vertx.vertx(new VertxOptions()

.setWorkerPoolSize(10)

.setEventLoopPoolSize(10));

JsonObject config = new JsonObject()

.put("url", "jdbc:mysql://xx:3306/base")

.put("driver_class", "com.mysql.cj.jdbc.Driver")

.put("max_pool_size", 10)

.put("user", "x")

.put("password", "x");

client = JDBCClient.createShared(vertx, config);

}

@Override

public void close() throws Exception {

client.close();

}

@Override

public void asyncInvoke(Click input, ResultFuture resultFuture) throws Exception {

client.getConnection(conn -> {

if (conn.failed()) {

return;

}

final SQLConnection connection = conn.result();

connection.query("select id, name from t where id = " + input.getId(), res2 -> {

ResultSet rs = new ResultSet();

if (res2.succeeded()) {

rs = res2.result();

}

List stores = new ArrayList<>();

for (JsonObject json : rs.getRows()) {

Store s = new Store();

s.setId(json.getInteger("id"));

s.setName(json.getString("name"));

stores.add(s);

}

connection.close();

resultFuture.complete(stores);

});

});

}

}

注意,一定要在query的返回调用方法中手动释放connection,不然马上就会报连接池耗尽的异常。使用时就没什么区别了:

AsyncDataStream

.unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10)

.print();

3 参考

1 vertx:

2 设计思想参考:

http://www.lbrq.cn/news/2499139.html

相关文章:

  • 宿州做企业网站公司美区下载的app怎么更新
  • 东营做网站公司网络营销的概念及内容
  • 国外平面设计师常看的网站名优网站关键词优化
  • 企业铭做网站免费网站站长查询
  • 新网站怎么做才会被收录软文广告素材
  • 百中搜网站建设媒体资源网
  • 珠海做网站方案杭州百度推广优化排名
  • 网站个免费的空间国外搜索引擎大全百鸣
  • 成都网站建设 3e如何推广好一个产品
  • 做vip视频网站侵权企业网站seo诊断工具
  • 电商网站如何做免费发布推广的网站
  • 品牌型网站建设推广网站seo
  • 谁有人和兽做的网站?策划推广
  • 网站 keywords seo关键词排名优化工具有用吗
  • 本科网站开发毕业设计国外免费域名申请
  • 有了域名之后如何做网站红河网站建设
  • 静态网站是什么样网络营销的方式
  • wordpress微信网站模板凤凰军事新闻最新消息
  • 做淘宝的导购网站百度点击软件找名风
  • 成都交易网站建设做app的网站
  • wordpress 内容 管理员查看曲靖seo
  • 怎么快速建动态网站自己的网站怎么做seo
  • 网站被入侵后需做的检测 1代写文案平台
  • 网站下拉广告网络广告策划与制作
  • 企业邮箱免费版注册windows优化大师是官方的吗
  • 网站建设作品图片网站生成
  • 白熊阅读做网站架构我想在百度发布信息
  • 中文免费网站模板吉林关键词排名优化软件
  • APP开发网站建设哪家好seo外包公司需要什么
  • 网站的手机客户端怎样做黑龙江最新疫情
  • Android 解析 TrafficDescriptor 的 OSAPP 信息
  • 依赖倒置原则 Dependency Inversion Principle - DIP
  • PROFINET转CAN通讯协议转换速通汽车制造
  • JavaEE初阶第十二期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(十)
  • 集合框架学习
  • 零基础学习性能测试第三章:jmeter性能组件应用(事件,并发,定时器)