• 手机版

    扫码体验手机版

  • 微信公众号

    扫码关注公众号

游客您好
第三方账号登陆
  • 点击联系客服

    在线时间:8:00-16:00

    客服电话

    400-123-4567

    电子邮件

    1691000615@qq.com
  • 星点互联APP

    随时掌握企业动态

  • 扫描二维码

    关注星点微信公众号

Flink中的多source+event watermark测试

Wotchin-本文作者
362 0 2019-1-12 12:03
摘要

这次需要做一个监控项目,全网日志的指标计算,上线的话,计算量应该是百亿/天单个source对应的sql如下最原始的sql select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,throwable,lev ...

这次需要做一个监控项目,全网日志的指标计算,上线的话,计算量应该是百亿/天

单个source对应的sql如下

最原始的sql

select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from 
(

select pro,throwable,level,ip,
count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 

from input.`ymm-appmetric-dev-self1` 

where 
pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

) 

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

---先做技术论证,写了下面一个sql

select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from (

select pro,throwable,level,ip,count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 
from (

select pro,throwable,level,ip
from input.`ymm-appmetric-dev-self1` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
union
select pro,throwable,level,ip
from input.`ymm-appmetric-dev-self2` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 

)

group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

)

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

然后拉起flink任务,观察是否可顺利启动---果然报错了

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'SPT' not found in any table

定位一下,看看是什么问题导致的,看了下之前写的sql,猜测是因为UNION的时候,没有在每个表里带上SPT时间属性字段以及其它字段,补上后sql如下

select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from (

select pro,throwable,level,ip,count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 
from (

select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT
from input.`ymm-appmetric-dev-self1` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
union
select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT
from input.`ymm-appmetric-dev-self2` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 

)

group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

)

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

再重启看看,这次应该差不多了吧---sql可以顺利编译,但是还是有错

奇怪了,之前并没有这样的错误,赞,我们来看看问题在哪!

 

我们打开类的层次图如下

借这个机会加强对这些类的理解!

---经过我的调试,发现问题出现在union上,不加这个Union,啥事没有;加了就报错,下面我们再回到调用栈看看

一个人调试了一个下午,-_-||,最终发现知道修改一个地方就行

union -> union all

厉害了,给大佬低头!

----好,既然解决了,我们继续来debug原理层!

测试了一下,发现多source跟单source相比,单source的watermark很好理解,但是多source就稍微复杂些,下面我们来研究下原理!

首先,观察一下现有的图,如下所示:

下面再来研究一下线程,jstack一把

我们来分析上面的线程,看看有没有收获!挑几个重点线程讲解

"VM Periodic Task Thread" os_prio=0 tid=0x00007f366825e800 nid=0x63d waiting on condition 
百度可以知道
该线程是JVM周期性任务调度的线程,它由WatcherThread创建,是一个单例对象。该线程在JVM内使用得比较频繁,比如:定期的内存监控、JVM运行状况监控。
下面几个是GC线程
"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668031800 nid=0x626 runnable 

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668033800 nid=0x627 runnable 

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668035800 nid=0x628 runnable 

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668037800 nid=0x629 runnable 

"Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668039800 nid=0x62a runnable 

"Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803b000 nid=0x62b runnable 

"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803d000 nid=0x62c runnable 

"Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803f000 nid=0x62d runnable 

"Concurrent Mark-Sweep GC Thread" os_prio=0 tid=0x00007f36680b7000 nid=0x630 runnable 

"Gang worker#0 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b2800 nid=0x62e runnable 

"Gang worker#1 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b4800 nid=0x62f runnable 

---

"main" #1 prio=5 os_prio=0 tid=0x00007f3668019800 nid=0x625 waiting on condition [0x00007f3670010000]
主线程,在flink内部等待所有事情结束
"New I/O worker #1" #24 prio=5 os_prio=0 tid=0x00007f366995f000 nid=0x648 runnable [0x00007f3642cd1000]
内部netty线程

---

"Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #51 prio=5 os_prio=0 tid=0x00007f363d11a800 nid=0x65e in Object.wait() [0x00007f3641ac3000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)
	- locked <0x00000000e6ee2df0> (a java.lang.Object)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)

"Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #50 prio=5 os_prio=0 tid=0x00007f363d120800 nid=0x65d in Object.wait() [0x00007f3641bc4000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)
	- locked <0x00000000e6ee2e98> (a java.lang.Object)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)

有2个线程是用来获取消息,对于这2个线程来说,这2个消息不是直接读取kafka,而是其它线程读取kafka喂给这2个线程

---

"time attribute: (SPT) (1/1)" #53 prio=5 os_prio=0 tid=0x00007f363d8e4000 nid=0x662 in Object.wait() [0x00007f36418c1000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:205)
	- locked <0x00000000e6ee8210> (a java.util.ArrayDeque)
	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
这个线程对应了我们sql里的union算子

---

"groupBy: (pro, throwable, level, ip), window: (TumblingGroupWindow('w$, 'SPT, 3000.millis)), select: (pro, throwable, level, ip, COUNT(*) AS count, lastStrInGroupSkipNull($f5) AS id, firstLong(l) AS firstl, lastLong(l) AS lastl, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (=(1, uniqueWithin100MS(pro, throwable, _UTF-16LE'ERROR', ip, w$end))), select: (pro, throwable, level, ip, count, id, w$end AS time, firstl, lastl) -> to: Row -> Sink: Kafka010JsonTableSink(pro, throwable, level, ip, count, id, time, firstl, lastl) (1/1)" #54 prio=5 os_prio=0 tid=0x00007f363fde3800 nid=0x664 in Object.wait() [0x00007f3641127000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:533)
	- locked <0x00000000e6ee2d48> (a java.util.ArrayDeque)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
这个对应了group by算子

---生产者

"kafka-producer-network-thread | producer-1" #55 daemon prio=5 os_prio=0 tid=0x00007f364d0f0800 nid=0x667 runnable [0x00007f3640a26000]
   java.lang.Thread.State: RUNNABLE
	at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	- locked <0x00000000e6ef3358> (a sun.nio.ch.Util$3)
	- locked <0x00000000e6ef3340> (a java.util.Collections$UnmodifiableSet)
	- locked <0x00000000e6eedbd8> (a sun.nio.ch.EPollSelectorImpl)
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	at org.apache.kafka.common.network.Selector.select(Selector.java:489)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
	at java.lang.Thread.run(Thread.java:748)
对应着生产者,直连kafka

---

"Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #57 daemon prio=5 os_prio=0 tid=0x00007f364d264800 nid=0x669 waiting on condition [0x00007f3640624000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000e6ef84c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

"Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #56 daemon prio=5 os_prio=0 tid=0x00007f363e937800 nid=0x668 waiting on condition [0x00007f3640725000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000e6ee2bc8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
每个流对应着一个水印定时发送线程,因为我这边的输入是2个流
所以有2个水印发送线程

---

 
                       
                    
                    

鲜花

握手

雷人

路过

鸡蛋
本文暂无评论,快来抢沙发!

微信关注“星点教育培训”每天分享有用的课程资源
阅读排行