I’m using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream by invoking the fileStream method. My code is working perfectly fine when I don’t use a file filter, e.g. by invoking the other fileStream method (described here).
According to the documentation of fileStream method, I can pass it
scala.Function1<org.apache.hadoop.fs.Path,Object> filter
But so far, I could not create a fileFilter
. My initial attempts have been
1- Tried to implement it as:
Function1<Path, Object> fileFilter = new Function1<Path, Object>() { @Override public Object apply(Path v1) { return true; } @Override public <A> Function1<A, Object> compose(Function1<A, Path> g) { return Function1$class.compose(this, g); } @Override public <A> Function1<Path, A> andThen(Function1<Object, A> g) { return Function1$class.andThen(this, g); } };
But apparently my implementation of andThen
is wrong, and I couldn’t understand how I should implement it. It complains that the anonymous function
is not abstract and does not override abstract method <A>andThen$mcVJ$sp(scala.Function1<scala.runtime.BoxedUnit,A>) in scala.Function1
2- Tried to implement it as:
Function1<Path, Object> fileFilter = new AbstractFunction1<Path, Object>() { @Override public Object apply(Path v1) { return true; } };
This one compiles but then when I run it I get an exception:
2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:263) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:74) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Any ideas how I can implement a fileFilter so that I can pass it fileStream
method, so that I can make Spark Streaming process only the file name patterns I want?
Answer
I had to create another file named FileFilter.java:
import org.apache.hadoop.fs.Path; import scala.runtime.AbstractFunction1; import java.io.Serializable; public class FileFilter extends AbstractFunction1<Path, Object> implements Serializable { @Override public Object apply(Path v1) { if ( v1.toString().endsWith((".json")) ) { return Boolean.TRUE; } else { return Boolean.FALSE; } } }
And then pass it to the fileStream method as in:
fileStream(inDirectory, new FileFilter(), false, ...)
And it worked without any problems.