Implement a Flume Deserializer Plugin to Import XML Files

Background

Flume is an open-source Apache project, it is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. This article shows how to import XML Files with Flume, including the development of a deserializer plugin and the corresponding configurations of Flume. We are using Flume 1.5.0 integrated in MapR.

The secenario is that XML files are sychronized to a directory periodically, we need to config a Spooling Directory Source to load these XML files into Flume.

Implement a Flume Deserializer

The default deserializer of Flume’s Spooling Directory Source is LineDeserializer, which simply parses each line as an Flume event. In our case, we need to implement a deserializer for XML files based on the structure.

Programming Environment

I prefer Gradle as the build automation and Eclispe as the IDE for java programming. Please make sure you have them installed.

1. Create a folder for the project and create a build.gradle file using the template below.

2. Add dependencies to the build.gradle.

1
2
3
4
5
6
7
8
@@ -25,6 +25,8 @@ repositories {
dependencies {
//compile 'commons-collections:commons-collections:3.2'
+ compile 'org.apache.flume:flume-ng-sdk:1.5.0'
+ compile 'org.apache.flume:flume-ng-core:1.5.0'
testCompile 'junit:junit:4.+'
}

3. Initialize source folders and the Eclipse project.

1
$ gradle initSourceFolders eclipse

4. Import the project to Eclipse and now you can start coding.

Development

Create a custom deserializer implements the EventDeserializer interface. For instance, here we name it MyXMLDeserializer. It reads input stream from ResettableInputStream and output List<Event> through the readEvents() function.

MyXMLDeserializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package me.xingwu.flume.plugins;
import java.io.IOException;
import java.util.List;
import org.apache.flume.Event;
import org.apache.flume.serialization.EventDeserializer;
public class MyXMLDeserializer implements EventDeserializer {
public MyXMLDeserializer() {
// TODO Auto-generated constructor stub
}
@Override
public Event readEvent() throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<Event> readEvents(int numEvents) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public void mark() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void reset() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}

It is better to read the XML files as a stream using javax.xml.stream.XMLStreamReader instead of parse the whole file to a XML object and then extract the events. So we need to wrap up ResettableInputStream to java.io.InputStream first:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package me.xingwu.flume.plugins;
import java.io.IOException;
import java.io.InputStream;
import org.apache.flume.serialization.ResettableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlumeInputStream extends InputStream {
private static final Logger logger = LoggerFactory.getLogger(FlumeInputStream.class);
private final ResettableInputStream in;
public FlumeInputStream(ResettableInputStream input) {
this.in = input;
}
@Override
public int read() throws IOException {
try {
return this.in.read();
} catch (Exception e) {
logger.error("input stream read failed:" + e.getMessage());
return 0;
}
}
}

Now we can start working on the XML parsing and Flume Events generating. Be sure to set the event headers if you get to route the events to different sinks later.

For details of events routing please refer to another post

The source code of the deserializer is as below (I changed some class/variable names and I can’t provide the XML structure, but you can get the idea from it):

MyXMLDeserializer.javaview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package me.xingwu.flume.plugins;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.ResettableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MyXMLDeserializer implements EventDeserializer {
private static final Logger logger = LoggerFactory.getLogger(MyXMLDeserializer.class);
private final ResettableInputStream in;
private XMLStreamReader r;
private final Charset outputCharset;
private final String nullData;
private volatile boolean isOpen;
private int generalColumnsIndex = -1;
private boolean inMdScope = false, inMvScope = false;
private String firstColName = "";
private String moid = "";
private List<String> dataCols;
private List<String> generalCols;
private final List<String> generalColNames;
private static final String MYXML_MD_ELEMENT = "md";
private static final String MYXML_MV_ELEMENT = "mv";
private static final String MYXML_MT_ELEMENT = "mt";
private static final String MYXML_MOID_ELEMENT = "moid";
private static final String MYXML_R_ELEMENT = "r";
public static final String OUT_CHARSET_KEY = "outputCharset";
public static final String CHARSET_DFLT = "UTF-8";
public static final String GENERAL_COLUMNS = "generalColumns";
public static final String GENERAL_COLUMNS_DFLT = "neun,nedn,nesw,mts,gp";
public static final String NULL_DATA = "nullData";
public static final String NULL_DATA_DFLT = "\\N";
MyXMLDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.isOpen = true;
this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.nullData = context.getString(NULL_DATA, NULL_DATA_DFLT);
this.generalColNames = new ArrayList<String>(Arrays.<String> asList(context.getString(
GENERAL_COLUMNS, GENERAL_COLUMNS_DFLT).split(",")));
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
try {
r = inputFactory.createXMLStreamReader(new FlumeInputStream(in));
} catch (XMLStreamException e) {
logger.error("createXMLStreamReader failed: " + e.getMessage());
this.isOpen = false;
}
}
/**
* Reads a XML file as a single event
*
* @return Event containing parsed line
* @throws IOException
*/
@Override
public Event readEvent() throws IOException {
logger.error("Reading a single event from a XML stream is not supported! Try readEvents()!");
return null;
}
/**
* Batch line read
*
* @param numEvents
* Maximum number of events to return.
* @return List of events containing read lines
* @throws IOException
*/
@Override
public List<Event> readEvents(int numEvents) throws IOException {
List<Event> events = Lists.newLinkedList();
ensureOpen();
String output = "";
try {
int event = r.getEventType();
while (true) {
switch (event) {
case XMLStreamConstants.START_DOCUMENT:
break;
case XMLStreamConstants.START_ELEMENT:
// activate scope
String strEleName = r.getName().getLocalPart();
if (inMdScope) {
if (inMvScope) {
if (strEleName.equals(MYXML_MOID_ELEMENT)) {
moid = r.getElementText();
} else if (strEleName.equals(MYXML_R_ELEMENT)) {
String dataCell = r.getElementText();
if (dataCell == null || dataCell.isEmpty()) {
dataCell = this.nullData;
}
dataCols.add(dataCell);
}
} else if (strEleName.equals(MYXML_MV_ELEMENT)) {
inMvScope = true;
dataCols = new ArrayList<String>();
} else if (strEleName.equals(MYXML_MT_ELEMENT)) {
if (firstColName.isEmpty()) {
firstColName = r.getElementText();
}
} else {
generalColumnsIndex = generalColNames.indexOf(strEleName);
if (generalColumnsIndex >= 0
&& generalColumnsIndex <= generalColNames.size()) {
generalCols.set(generalColumnsIndex, r.getElementText());
}
}
} else if (strEleName.equals(MYXML_MD_ELEMENT)) {
inMdScope = true;
// initialize general columns for each md
generalCols = new ArrayList<String>(generalColNames);
}
break;
case XMLStreamConstants.CHARACTERS:
break;
case XMLStreamConstants.END_ELEMENT:
// close scope
String strEndEle = r.getName().getLocalPart();
if (strEndEle.equals(MYXML_MD_ELEMENT)) {
inMdScope = false;
firstColName = "";
} else if (strEndEle.equals(MYXML_MV_ELEMENT)) {
inMvScope = false;
// build event
output = Joiner.on("|").useForNull(this.nullData).join(generalCols) + "|"
+ moid + "|"
+ Joiner.on("|").useForNull(this.nullData).join(dataCols);
Event objEvent = EventBuilder.withBody(output, outputCharset);
Map<String, String> headers = generateHeaders(generalColNames, generalCols);
headers.put("firstCol", firstColName);
objEvent.setHeaders(headers);
events.add(objEvent);
moid = "";
}
break;
case XMLStreamConstants.END_DOCUMENT:
break;
}
if (!r.hasNext())
break;
event = r.next();
if (events.size() >= numEvents) {
break;
}
}
} catch (XMLStreamException e) {
logger.error(e.getMessage());
}
return events;
}
@Override
public void mark() throws IOException {
ensureOpen();
in.mark();
}
@Override
public void reset() throws IOException {
ensureOpen();
in.reset();
}
@Override
public void close() throws IOException {
if (isOpen) {
reset();
in.close();
try {
r.close();
} catch (XMLStreamException e) {
logger.error("XML close error: " + e.getMessage());
}
isOpen = false;
}
}
private Map<String, String> generateHeaders(List<String> keys, List<String> values) {
Map<String, String> map = Maps.newHashMap();
for (int i = 0; i < keys.size(); i++) {
if (keys.get(i).equals("mts")) {
Date d;
try {
d = (new SimpleDateFormat("yyyyMMddHHmmssX").parse(values.get(i)));
map.put("timestamp", String.valueOf(d.getTime()));
} catch (ParseException e) {
logger.error("Parse timestamp error: " + e.getMessage());
}
} else {
map.put(keys.get(i), values.get(i));
}
}
return map;
}
private void ensureOpen() {
if (!isOpen) {
throw new IllegalStateException("Serializer has been closed");
}
}
public static class Builder implements EventDeserializer.Builder {
@Override
public EventDeserializer build(Context context, ResettableInputStream in) {
return new MyXMLDeserializer(context, in);
}
}
}

Unittest

We can build a unittest file and run as JUnit Test:

MyXMLDeserializerTest.javaview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package me.xingwu.flume.plugins;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.DurablePositionTracker;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.PositionTracker;
import org.apache.flume.serialization.ResettableFileInputStream;
import org.apache.flume.serialization.ResettableInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import me.xingwu.flume.plugins.MyXMLDeserializer;
public class MyXMLDeserializerTest {
private static final boolean CLEANUP = true;
private static final Logger logger = LoggerFactory.getLogger(MyXMLDeserializerTest.class);
private File file, meta;
@Before
public void setup() throws Exception {
file = new File("target/test2.xml").getAbsoluteFile();
logger.info("Data file: {}", file);
meta = File.createTempFile("test", ".avro");
logger.info("PositionTracker meta file: {}", meta);
meta.delete(); // We want the filename but not the empty file
}
@After
public void tearDown() throws Exception {
if (CLEANUP) {
meta.delete();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void testReadEvents() throws IOException {
PositionTracker tracker = DurablePositionTracker.getInstance(meta, file.getPath());
ResettableInputStream in = new ResettableFileInputStream(file, tracker);
List<Event> events;
EventDeserializer des = new MyXMLDeserializer(new Context(), in);
events = des.readEvents(650);
System.out.println("Event: " + events.size());
for (Event e : events) {
System.out.println("Event: <" + e.getHeaders().get("firstCol") + ">"
+ new String(e.getBody()));
}
des.close();
}
}

Build and Deployment

Run gradle build then you can find the lib at build/libs/the-package-name-0.1.0.jar. You can change the jar name and version in the build.grale.

For the deployment, simply put the jar in $FLUME_HOME/plugins.d/ANY_NAME_YOU_LIKE/lib/ and configure the Flume this way:

1
2
3
4
5
6
# Source
agent.sources = helloXMLs
agent.sources.helloXMLs.type = spooldir
agent.sources.helloXMLs.spoolDir = /opt/data/spooltest
agent.sources.helloXMLs.deletePolicy = immediate
agent.sources.helloXMLs.deserializer = me.xingwu.flume.plugins.MyXMLDeserializer$Build

Make sure you put a $Build after your class name, or you will get an error like this:

org.apache.flume.FlumeException: Unable to instantiate Builder from me.xingwu.flume.plugins.MyXMLDeserializer: does not appear to implement org.apache.flume.serialization.EventDeserializer$Builder

Now the XML source has been set up! Configure channels and sinks for Flume and enjoy! Leave a comment if you get any questions :)

^