Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prepare for Spark 4 #394

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

newfront
Copy link

This PR provides support for Spark 4.0.0 (currently preview2) along with major updates to Jedis (from 3.9 -> 5x) as well as migration to Scala 2.13 and Java 17.

Open Questions

The logic in the getClusterNodes changed to reflect the new Jedis conn.clusterShards() command. While the Unit Tests all pass I am unsure of the correct indexing for idx. The comment specified that it was for non-master nodes, and the node.getRole() returns master for all nodes which makes sense given everything is running on 127.0.0.1.

With that said, I could add an AtomicInteger to ensure the idx field has monotonically increasing values but I wanted to understand more about how they are used before moving forwards (hence the WIP status).

  private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
    val conn = initialHost.connect()

    val res = conn.clusterShards().asScala.flatMap {
      shardInfoObj: ClusterShardInfo => {
        val slotInfo = shardInfoObj.getSlots

        // todo: Can we have more than 1 node per ClusterShard?
        val nodeInfo = shardInfoObj.getNodes.get(0)

        /*
         * We will get all the nodes with the slots range [sPos, ePos],
         * and create RedisNode for each nodes, the total field of all
         * RedisNode are the number of the nodes whose slots range is
         * as above, and the idx field is just an index for each node
         * which will be used for adding support for slaves and so on.
         * And the idx of a master is always 0, we rely on this fact to
         * filter master.
         */
        (0 until (slotInfo.size)).map(i => {
          val host = SafeEncoder.encode(nodeInfo.getIp.getBytes(Charset.forName("UTF8")))
          val port = nodeInfo.getPort.toInt
          val slotStart = slotInfo.get(i).get(0).toInt
          val slotEnd = slotInfo.get(i).get(1).toInt
          val endpoint = RedisEndpoint(
            host = host,
            port = port,
            user = initialHost.user,
            auth = initialHost.auth,
            dbNum = initialHost.dbNum,
            timeout = initialHost.timeout,
            ssl = initialHost.ssl)
          val role = nodeInfo.getRole
          val idx = if (role == "master") 0 else i
          RedisNode(endpoint, slotStart, slotEnd, idx, slotInfo.size)
        })
      }
    }.toArray
    conn.close()
    res
  }

@newfront
Copy link
Author

Thanks in advance.

@newfront
Copy link
Author

The other thing to mention is that in Spark 4, the older DStream Receivers are deprecated, so I removed all traces of non Structured Streaming.

@zen-data
Copy link

This biggest change here is RedisNode index (idx) for the cluster getNodes function. Here is the object for clarity.

redis_cluster-shard-info

This is one item in the List[ClusterShardInfo] that is read in the flatMap operation provided above.

@ff137
Copy link

ff137 commented Oct 23, 2024

Let's go! 👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants