How to make it more Scala compatible?

Posted on

Problem

When you write a code in python code people always ask to make it more pythonic. I am not sure if there is a term for scala or not. However, I am pretty sure my following code use one or two scala feature.

Objective

  • Read a property file, which has location to XML file
  • Read source, destination and some other properties from property and XML File
  • Copy Data from one location to another location.

FileSystem.scala

bstract class FileSystem(flagFileURI: String){

  def getRecordCount(properties: Properties): String = {
    properties.getProperty("recordCount")
  }

  def getAuditID: String = {
    val instant: Instant = Instant.now
    val zoneId: ZoneId = ZoneId.of("Canada/Eastern")
    val auditIdFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    val auditId = ZonedDateTime.ofInstant(instant, zoneId).format(auditIdFormatter)
    auditId
  }

  def getSourcePath(properties: Properties): Path = {
    val dataFileURI: String = properties.getProperty("dataFileURI")
    val srcPath = new Path(dataFileURI)
    srcPath
  }

  def getMetaDataFileURI(properties: Properties): String = {
    properties.getProperty("metadataFileURI")
  }

  def getIDPDataDate(properties: Properties): String = {
    properties.getProperty("idpDataDate")
  }
  def getDestinationPath(properties: Properties, metaDataFileURI: String, IDPDataDate: String, dataFileName: String): Path = {
    val l0DirLocation: String = ConfigFactory.load().getString("sparkFramework.hdfs.l0dirlocation")

    val frameWorkUtils = new FrameworkUtils()
    val sourceSystem: SourceSystem = frameWorkUtils.getSourceSystem(metaDataFileURI)
    val schemaName: String = frameWorkUtils.getSchemaName(sourceSystem)
    val tableName: String = frameWorkUtils.getTableName(sourceSystem)

    val destPath = new Path(l0DirLocation + schemaName + "/" + tableName + "/idp_data_date=" + IDPDataDate + "/" + dataFileName)

    destPath
  }

  def getDataFileName(srcPath: Path, auditID: String): String = {
    val dataFileName: String = srcPath.getName + "_" + getAuditID
    dataFileName
  }

}

HDFileSystem.scala

class HDFileSystem(flagFileURI: String) extends FileSystem(flagFileURI) {

  def copyFromLocalFile: (String, String, String, String, String) = {

    val properties: Properties = new Properties()
    val source: InputStreamReader = Source.fromFile(flagFileURI).reader
    properties.load(source)

    val metaDataFileURI = getMetaDataFileURI(properties)
    val srcPath = getSourcePath(properties)
    val auditId = getAuditID
    val dataFileName = getDataFileName(srcPath, auditId)
    val IDPDataDate = getIDPDataDate(properties)
    val destPath = getDestinationPath(properties, metaDataFileURI, IDPDataDate, dataFileName)
    val recordCount = getRecordCount(properties)

    val hadoopConf = new Configuration()
    val hdfs = FileSystem.get(hadoopConf)
    hdfs.copyFromLocalFile(true,false,srcPath, destPath)
    (IDPDataDate,recordCount,auditId,metaDataFileURI,dataFileName)

  }
}

Main.scala

 val hdFileSystem = new HDFileSystem(flagFileURI = args(0))
    val (idpDataDate,count,auditId,metadataFileURI,fileName) = hdFileSystem.copyFromLocalFile

Solution

What you have isn’t bad, but I feel like you’re creating too many variables. For example, in getAuditID and other methods, you assign an object to a variable and immediately return it. You also have some unnecessary type annotations.

def getAuditID: String = {
    val instant: Instant = Instant.now
    val zoneId: ZoneId = ZoneId.of("Canada/Eastern")
    val auditIdFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    val auditId = ZonedDateTime.ofInstant(instant, zoneId).format(auditIdFormatter)
    auditId
}

After eliminating that, I got this:

abstract class FileSystem(flagFileURI: String) {

  def getRecordCount(properties: Properties): String =
    properties.getProperty("recordCount")

  def getAuditID: String =
    ZonedDateTime
      .ofInstant(Instant.now, ZoneId.of("Canada/Eastern"))
      .format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"))

  def getSourcePath(properties: Properties): Path =
    new Path(properties.getProperty("dataFileURI"))

  def getMetaDataFileURI(properties: Properties): String =
    properties.getProperty("metadataFileURI")

  def getIDPDataDate(properties: Properties): String =
    properties.getProperty("idpDataDate")

  def getDataFileName(srcPath: Path, auditID: String): String =
    srcPath.getName + "_" + getAuditID

  def getDestinationPath(
      properties: Properties,
      metaDataFileURI: String,
      IDPDataDate: String,
      dataFileName: String
  ): Path = {
    val l0DirLocation =
      ConfigFactory.load().getString("sparkFramework.hdfs.l0dirlocation")
    val frameWorkUtils = new FrameworkUtils()
    val sourceSystem =
      frameWorkUtils.getSourceSystem(metaDataFileURI)
    val schemaName = frameWorkUtils.getSchemaName(sourceSystem)
    val tableName = frameWorkUtils.getTableName(sourceSystem)

    new Path(
      l0DirLocation + schemaName + "/" + tableName + "/idp_data_date=" + IDPDataDate + "/" + dataFileName
    )
  }
}

However, your other method, copyFromLocalFile, could also be improved. The return type is a tuple of 5 strings, which to me seems much too complex. Instead of that, I’d suggest making a case class that makes it obvious what each of those strings mean.

case class FileInfo(
    IDPDataDate: String,
    recordCount: String,
    auditId: String,
    metaDataFileURI: String,
    dataFileName: String
)

Then you can define this method in FileSystem to get all the information at once

def getFileInfo(properties: Properties): FileInfo = {
    val auditId = getAuditID
    FileInfo(
      getIPDDataDate(properties),
      getRecordCount(properties),
      auditId,
      getMetaDataFileURI(properties),
      getDataFileName(getSourcePath(properties), auditID)
    )
}

After that, you can turn your method into something like this:

def copyFromLocalFile: FileInfo = {

    val properties: Properties = new Properties()

    properties.load(Source.fromFile(flagFileURI).reader)

    val fileInfo = getFileInfo(properties)

    FileSystem
      .get(new Configuration())
      .copyFromLocalFile(
        true,
        false,
        fileInfo.srcPath,
        getDestinationPath(
          properties,
          fileInfo.metaDataFileURI,
          fileInfo.IDPDataDate,
          fileInfo.dataFileName
        )
      )

    fileInfo
}

You can still destructure the result:

val FileInfo(idpDataDate, count, auditId, metadataFileURI, fileName) =
  hdFileSystem.copyFromLocalFile

Of course, I don’t know about the whole structure of your program, so this might not work for you.

Link to Scastie

Leave a Reply

Your email address will not be published. Required fields are marked *