Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Adding entity resolution feature to SANSA-ML. #24

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from

Conversation

amrit1210
Copy link

The feature uses minhashLSH based approach to identify and link similar entities in two datasets

…hLSH based approach to identify and link similar entities in two datasets
@GezimSejdiu GezimSejdiu self-requested a review November 17, 2019 20:32
Copy link
Member

@GezimSejdiu GezimSejdiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @amrit1210 ,

thanks a lot for your contributions. I just went through it and added some comments. Please, try to resolve them or add your comments so we can discuss them in more detail. In particular, do:

  1. Please, have a look that we do follow the official Scala-style guide and it should be adapted accordingly into your code as well (see my comments on the review). If you want to validate it, just run mvn scalastyle:check and if that pass you are okay to push changes.
  2. Try to write code once. There are many duplicates between both classes and it would be better if we can write reusable, generic code and/or calling existing methods instead.
  3. No unit-test provided. We should have some basic unit-test where individual units/ components of the component are tested.

Looking forward to resolving those issues and move forward on mergeing the PR.

Best regards,


// Compare the predicates for matched entities by subject
val refined_data_pred = get_similar_predicates(ds_subjects)
val ds_predicates = refined_data_pred.repartition(400).persist(StorageLevel.MEMORY_AND_DISK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. We should remove any hard-coded code.

def get_entity_profiles(triplesRDD: RDD[Triple]): RDD[(String, String)] = {

//predicates to be filtered out from triples
val removePredicates: List[String] = List("owl:sameas", "wikiPageID", "wikiPageRevisionID", "wikiPageRevisionLink",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it is quite static? What will happen if there is another dataset to be used? Can we add them to a specific configuration file? or even an input (parameter, file) which allows users to define their predicates they want to filter them out. We could make it even more extensible by allowing users to add them as RDF triples (we could define as simple RDF graph which contains these predicates to filter out, but this is optional - the RDF graph and not moving them out of the hard-coded variable).


//Get subject data and tokenise it
val ent_sub1 = entites_RDD1.map(f => { (f._1, f._1.split("_")) })
val part_rdd1 = new RangePartitioner(400, ent_sub1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this number is quite static and we should find a way how to define it (if it is needed).


//Calculate TruePostives for precision, recall and f1-measure
val truePositives = actual_rdd.intersection(predicted_rdd).count
println("***************************************************************************************")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No printlns statements on API. We should move them to SANSA-Examples.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same for the rest as well.

* Dataset1 has 47 million entities, approx 19GB
* Dataset2 has 19 million entities, approx 5.8GB
* */
class EntityResolution_RDFData_HashingTF(spark: SparkSession, triplesRDD1: RDD[Triple], triplesRDD2: RDD[Triple],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes here (regarding the scala-style naming conventions).

* where subject is the key and the group of paired predicate:object forms the value
* ex:- (Budapest_City, areaCode:1 , country:Hungary)
*/
def get_entity_profiles(triplesRDD: RDD[Triple]): RDD[(String, String)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference in this method from the class above? If not, please consider writing code-once and reuse it when it's needed. I suggest that you merge them into e.g. Utils class or similar and do not keep t wo identic code snippets into two different classes of the same package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I see that there are also other methods which are the same in both classes. Please, merge them and define them only once. I will not comment on similar methods twice. Consider, the same messages above for the same methods.

Copy link
Member

@GezimSejdiu GezimSejdiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @amrit1210 ,

many thanks for your updates and contributions. Please, go through my comments and try to resolve them.
Important points to be considered:

  • make it scala-style compliant
  • remove files from the main package into resources
  • provide basic unit-test and move out main method into SANSA-Examples
  • the other comments

Best regards,

Best regards,

teacher: DataFrame, threshold_subject: Double, jsimilarity_predicate: Double,
threshold_object: Double, vocab_size: Long, output_path: String) extends Serializable {
abstract class Commons(val spark: SparkSession, val sourceData1: RDD[Triple], val sourceData2: RDD[Triple],
val teacher: DataFrame, val thresholdSubject: Double, val jsimilarityPredicate: Double,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why teacher? Are we making this generic enough which will accept any other type (RDF) dataset? I think we should remove such a misleading name. Of course, in your use-case, it makes totally sense, but here we should keep it more generic, as we aim to provide it as an API within SANSA.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

teacher is basically the labelled data to which we compare our predictions later on to evaluate recall and precision values. Yes, our algorithm can accept any other RDF dataset but n order to evaluate how well the entities are resolved, we would need the gold standards i.e. teacher in our case. I can change it to labels if you suggest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I think you should include that DF as part of the algorithm. If you use validation data just for the sake of prediction and not training, I think we should generalize it to accept any other validation data. So, better remove it from the main algorithm and just add it when running the example.

class EntityResolution_RDFData_CountVectorizer(spark: SparkSession, triplesRDD1: RDD[Triple], triplesRDD2: RDD[Triple],
teacher: DataFrame, threshold_subject: Double, jsimilarity_predicate: Double,
threshold_object: Double, vocab_size: Long, output_path: String) extends Serializable {
abstract class Commons(val spark: SparkSession, val sourceData1: RDD[Triple], val sourceData2: RDD[Triple],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Commons? and why abstract? To my understanding, The abstract methods of an abstract class are those methods that do not contain any implementation. and in your case, your methods are fully implemented. I will suggest that you create a package utils which then contains such methods (grouped by their functionality into separate Scala objects).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Commons class contains the abstract method vectorise which has seperate implemntations in the two subclass that inherit this class. That's why it is structured in this way. If you still think I should do it in some other better way. PLease suggest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but feel free to keep it as you have if you think that is the right way to do. Later, we can refactor it if needed.

@@ -35,22 +39,22 @@ class EntityResolution_RDFData_CountVectorizer(spark: SparkSession, triplesRDD1:
def run(): RDD[(String, String)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I already mentioned it on my previous review, we should get rid of any main method into API. We could add unit-tests (by the way, I do not see any unit tests here, we usually should have at least some coverage with the functionality before we merge the PR) which cover some of the functionality and the main run we should consider moving it to the SANSA-Examples into ml examples.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is required, so as to accomplish the 3-step procedure for our algorithm. As the output from each step becomes the input for the next, being a sequential process. We are including unit tests separately.

@@ -71,16 +75,19 @@ class EntityResolution_RDFData_CountVectorizer(spark: SparkSession, triplesRDD1:
* where subject is the key and the group of paired predicate:object forms the value
* ex:- (Budapest_City, areaCode:1 , country:Hungary)
*/
def get_entity_profiles(triplesRDD: RDD[Triple]): RDD[(String, String)] = {
def getEntityProfiles(sourceData: RDD[Triple]): RDD[(String, String)] = {

//predicates to be filtered out from triples
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not, sure if you have run mvn scalastyle:check before you pushed the changes, but what I can see Scala Style Check will complain about this one (see here: http://www.scalastyle.org/rules-dev.html#org_scalastyle_scalariform_SpaceAfterCommentStartChecker ).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same for the rest!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@GezimSejdiu GezimSejdiu added the ML label Jan 15, 2020
@GezimSejdiu GezimSejdiu added this to the 0.8 milestone Jan 15, 2020
@GezimSejdiu
Copy link
Member

Hi @amrit1210 ,

any news about my comments? It would be good if we can resolve some of them and move the PR forward.

Many thanks in advance for your valuable contribution.

Best regards,

@amrit1210
Copy link
Author

amrit1210 commented Feb 12, 2020 via email

@amrit1210
Copy link
Author

Dear Gezim,

To the last patch, I have added some more corrections. Please consider both the commits for complete changes.

Kind Regards,

@GezimSejdiu
Copy link
Member

Hi @amrit1210 ,

once more, thanks a lot for your contribution.
I just run the PR locally and it seems to fail in one of the test-cases. I'm getting:

ERTests:
net.sansa_stack.ml.spark.entity_resolution.ERTests *** ABORTED ***
  java.lang.RuntimeException: Unable to load a Suite class that was discovered in the runpath: net.sansa_stack.ml.spark.entity_resolution.ERTests
  at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:81)
  at org.scalatest.tools.DiscoverySuite$$anonfun$1.apply(DiscoverySuite.scala:38)
  at org.scalatest.tools.DiscoverySuite$$anonfun$1.apply(DiscoverySuite.scala:37)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  ...
  Cause: java.lang.NullPointerException:
  at net.sansa_stack.rdf.spark.io.NTripleReader$.load(NTripleReader.scala:112)
  at net.sansa_stack.rdf.spark.io.package$RDFReader$$anonfun$ntriples$4.apply(package.scala:217)
  at net.sansa_stack.rdf.spark.io.package$RDFReader$$anonfun$ntriples$4.apply(package.scala:216)
  at net.sansa_stack.ml.spark.entity_resolution.ERTests.<init>(ERTests.scala:17)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at java.lang.Class.newInstance(Class.java:442)
  at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66)
  ...

Would it be possible that you double-check that test case(s) via mvn test and see what is wrong with that?

Best regards,

@amrit1210
Copy link
Author

amrit1210 commented Feb 19, 2020 via email

@amrit1210
Copy link
Author

Hi Gezim,

I have rechecked it again. But I am not sure about the null session. Can you please check that?.

Thanks and Regards,
Amrit Kaur

@GezimSejdiu
Copy link
Member

Hi @amrit1210 ,
so I just did a clean up and fixed the issue within the null value to the SparkSession but it is throwing another exception w.r.t to the datatypes. See the trace below:

20/03/21 00:25:51 ERROR Executor: Exception in task 16.0 in stage 3.0 (TID 76)
org.apache.jena.datatypes.DatatypeFormatException: Lexical form '1967-4-4' is not a legal instance of Datatype[http://www.w3.org/2001/XMLSchema#date] Lexical form '1967-4-4' is not a legal instance of Datatype[http://www.w3.org/2001/XMLSchema#date] during parse -org.apache.jena.ext.xerces.impl.dv.InvalidDatatypeValueException: cvc-datatype-valid.1.2.1: '1967-4-4' is not a valid value for 'date'.
	at org.apache.jena.graph.impl.LiteralLabelImpl.getValue(LiteralLabelImpl.java:338)
	at org.apache.jena.graph.Node_Literal.getLiteralValue(Node_Literal.java:44)
	at net.sansa_stack.ml.spark.entity_resolution.Commons$$anonfun$3.apply(Commons.scala:91)
	at net.sansa_stack.ml.spark.entity_resolution.Commons$$anonfun$3.apply(Commons.scala:83)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
20/03/21 00:25:52 ERROR TaskSetManager: Task 16 in stage 3.0 failed 1 times; aborting job

Please, could you double check if the data you are giving as an input (if you have tested them before) are valid or we have to chnge something on the transformation (i.e. when you are getting the datatypes of the literals).

Best regards,

@amrit1210
Copy link
Author

Hi Gezim,

The issue is fixed. Tested.

Regards,
Amrit

@@ -88,7 +88,7 @@ abstract class Commons(val spark: SparkSession, val sourceData1: RDD[Triple], va
val value = pred + ":" + obj // predicate and object are seperated by ':'
(key, value)
} else {
val obj = f.getObject.getLiteralValue.toString()
val obj = f.getObject.getLiteral.toString().split(Array('^', '@')).head.trim()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we get the lexical form of the literal? which will remove everything i.e. language tag, type, etc? Or is this method meant for something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It does the same.

Copy link
Member

@LorenzBuehmann LorenzBuehmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not accept this PR in its current state (but maybe it's sufficient enough for you and the project? I wasn't involved in this algorithm so I don't know the requirements you defined, I just know that "working on DBpedia dataset" doesn't mean anything in general)

  1. for me the whole purpose of the algorithm is basically a blackbox. I neither know what input nor what output is supposed to be, so it's hard to judge on whether it's doing what it is supposed to do
  2. there is a bunch of params that are not documented, so it's hard to understand their purpose, like having two threshold params for subject and object or two config params for number of partitions and repartition

The most critical point:

  • The whole concept of using URIs as identifiers is ignored by just keeping the local name during runtime. This might be ok at a first glance, but a property with the same local name but different namespaces can have a totally different meaning.
  • Moreover, the result of the whole pipeline is also just a tuple of two strings which are just the local names, so you won't have a direct mapping back to the URIs of the original data. This again is error prone.
  • Also, the blacklist of predicates considered during computation is basically hard coded for DBpedia.
  • there are some (in my opinion) inefficient parts in the code like doing (pseudo code)
triplesToRemove = triples.filter(t.p => p in blacklist)
triplesToKeep = triples.subtract(triplesToRemove)

instead of just doing negation on the filter, e.g.

triplesToKeep = triples.filter(t.p => p **not** in blacklist)
  • is it really necessary to use string operations back and forth? like e = "p:o" and later you do e.split(":") to get p and/or o. wouldn't it be easier to keep tuples here? Or do you need the string "p:o" for something important?
  • many Scaladoc params do not match the method signature (anymore)

@JensLehmann JensLehmann removed this from the 0.8 milestone Jul 2, 2020
@JensLehmann JensLehmann added this to the 0.9 milestone Jul 2, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants