RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值
实例: 以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作 1 2 3 4 | #!/bin/sh filename=$ 1 hostname=`hostname -s` tail -F $ 1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}' |
source的配置:
1 2 3 4 5 6 7 8 | xxxx.sources.kafka1.interceptors = i1 xxxx.sources.kafka1.interceptors.i1.type = regex_extractor xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/ xxxx.sources.kafka1.interceptors.i1.serializers = s1 xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename xxxx.sources.kafka1.selector.type = multiplexing xxxx.sources.kafka1.selector.header = logtypename xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel |
1 2 3 4 5 6 | serializers 定义匹配组(正则匹配之后的值作为header的值,比如如果 Event body为 1 : 2 : 3 .4foobar5,regex为(\\d):(\\d):(\\d),serializers 设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name 为three,那么one-> 1 ,two-> 2 ,three-> 3 .4foobar5,注意可以不必匹配所有的组) serializers.x.name 作为event的header |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | private List<NameAndSerializer> serializerList; private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); .... private void configureSerializers(Context context) { String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置 Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr), "Must supply at least one name and serializer" ); String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔 Context serializerContexts = new Context(context.getSubProperties( SERIALIZERS + "." )); serializerList = Lists. newArrayListWithCapacity(serializerNames.length); for (String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作 Context serializerContext = new Context( serializerContexts.getSubProperties(serializerName + "." )); String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer String name = serializerContext.getString( "name" ); 获取serializers.x.name的设置 Preconditions. checkArgument(!StringUtils. isEmpty(name), "Supplied name cannot be empty." ); if ( "DEFAULT" .equals(type)) { serializerList .add( new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象 } else { serializerList .add( new NameAndSerializer(name, getCustomSerializer( type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象 } } } |
这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:
1 2 3 4 | org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //直接返回,不做另外的操作(默认的类) org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer //使用指定的formatting pattern把传入的值转换为milliseconds |
2)build方法用于返回一个RegexExtractorInterceptor对象
1 | return new RegexExtractorInterceptor( regex , serializerList ); |
RegexExtractorInterceptor的主要方法intercept:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | static final String REGEX = "regex" ; static final String SERIALIZERS = "serializers" ; ... public Event intercept(Event event) { Matcher matcher = regex.matcher( new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作 Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对 if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则 for ( int group = 0 , count = matcher.groupCount(); group < count; group++) { int groupIndex = group + 1 ; // 匹配的index从1开始 if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度 .... break ; } NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象 .... headers.put(serializer. headerName, serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理 } } return event; } |
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1619537,如需转载请自行联系原作者