手机怎么防止网站跳转/星沙网站优化seo
1,源码重点
2,编译
编译成功之后将cep的包替换maven库对应的cep jar
1)我们编译成功之后得到了cep的包:
2)去本地库替换maven库里面的这个cep包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>${project.version}</version>
</dependency>
3,运行案例代码:
package org.apache.flink.streaming.examples.cep;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.RichPatternSelectFunction;
import org.apache.flink.cep.listern.CepListener;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Map;/**
* @Description:
* @Author: greenday
* @Date: 2019/8/15 14:05
*/
public class Driver {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 数据源
SingleOutputStreamOperator<Tuple3<String, Long, String>> source = env.fromElements(
new Tuple3<String, Long, String>("a", 1000000001000L, "22")
, new Tuple3<String, Long, String>("b", 1000000002000L, "23")
, new Tuple3<String, Long, String>("c", 1000000003000L, "23")
, new Tuple3<String, Long, String>("d", 1000000003000L, "23")
, new Tuple3<String, Long, String>("change", 1000000003001L, "23")
, new Tuple3<String, Long, String>("e", 1000000004000L, "24")
, new Tuple3<String, Long, String>("f", 1000000005000L, "23")
, new Tuple3<String, Long, String>("g", 1000000006000L, "23")
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple3<String, Long, String>>() {
long maxTimsStamp;
@Nullable
public Watermark checkAndGetNextWatermark(Tuple3<String, Long, String> stringLongStringTuple3, long l) {
return new Watermark(maxTimsStamp - 1000);
}public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long per) {
long elementTime = stringLongStringTuple3.f1;
if (elementTime > maxTimsStamp) {
maxTimsStamp = elementTime;
}
return elementTime;
}
});Pattern<Tuple3<String, Long, String>,?> pattern = Pattern
.<Tuple3<String, Long, String>>begin("start").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f0.equals("a");
}
})
.next("middle").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f0.equals("b");
}
})
.within(Time.minutes(5));PatternStream patternStream = CEP.pattern(source, pattern);
PatternStream patternstream = patternStream
// 更新逻辑
.registerListener(new CepListener<Tuple3<String, Long, String>>(){
@Override
public Boolean needChange(Tuple3<String, Long, String> element) {
return element.f0.equals("change");
}
@Override
public Pattern returnPattern() {
Pattern<Tuple3<String, Long, String>, ?> pattern = Pattern
.<Tuple3<String, Long, String>>begin("start").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f0.equals("e");
}
})
.next("middle").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f0.equals("f");
}
})
.next("end").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f0.equals("g");
}
})
.within(Time.minutes(5));
return pattern;
}
});patternstream.select(new RichPatternSelectFunction<Tuple3<String, Long, String>, Tuple3<String,String,String>>() {
@Override
public Tuple3 select(Map pattern) throws Exception {
String start = pattern.containsKey("start") ? ((ArrayList)pattern.get("start")).get(0).toString() : "" ;
String middle = pattern.containsKey("middle") ? ((ArrayList)pattern.get("middle")).get(0).toString() : "" ;
String end = pattern.containsKey("end") ? ((ArrayList)pattern.get("end")).get(0).toString() : "" ;
return new Tuple3<String,String,String>(start,middle,end);
}
}).print();env.execute("cep");
}
}
4,结果展示: