BigData|ទាញទិន្ន័យពី T24 ដោយប្រើ Spark

នេះជាកូដដែលខ្ញុំប្រើដើម្បីទាញទិន្ន័យពីT24 core banking system ដែលប្រើ ប្រាស់ Oracle DB . ក្នុងកូដនេះខ្ញុំប្រើ Big Data Technology stack ដូចជា Apache Hadoop, Apache Spark Scala programming

_config.yml

package com.company.projects.app.T24

import java.text.SimpleDateFormat

import org.apache.spark.sql.SparkSession

/*
 1. Extract the header from FileLayout (<c1>, <c2>, <c3>) : done
 2. Extract each row data of the table (Oracle at DR site) , there are 2 rows (RECID, XMLRECORD) : done
 3. Transform the row into header + row data (xml record) : doing (80%)
 4. Load the extracted into csv file or RDBMS :  todo
* */
object ATMBranches {
  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.functions.udf
    val toString = udf((payload: Array[Byte]) => new String(payload))
    val format = new SimpleDateFormat("d-M-y")

    val spark = SparkSession.builder()
      .appName("ConnectingOracleDatabase")
      .master("local")
      .getOrCreate()

    //ORACLE
    val jdbcUrl = "jdbc:oracle:thin:@DR-CBS-CDB01:1422:drcbs"
    val jdbcUsername = "username"
    val jdbcPassword = "password"

    // Check connectivity to ORACLE Server
    import java.sql.DriverManager
    val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
    connection.isClosed()

    val query =
      s"""(SELECT
         |RECID as RECID,
         |extractvalue(XMLRECORD,'/row/c2') as BRANCH_CODE,
         |extractvalue(XMLRECORD,'/row/c3') as DEF_CR_ACCT,
         |extractvalue(XMLRECORD,'/row/c4') as PROC_CODE,
         |extractvalue(XMLRECORD,'/row/c5') as CR_CCY,
         |extractvalue(XMLRECORD,'/row/c6') as CR_ACCT,
         |extractvalue(XMLRECORD,'/row/c7') as RESERVED_5,
         |extractvalue(XMLRECORD,'/row/c8') as RESERVED_4,
         |extractvalue(XMLRECORD,'/row/c9') as RESERVED_3,
         |extractvalue(XMLRECORD,'/row/c10') as RESERVED_2,
         |extractvalue(XMLRECORD,'/row/c11') as RESERVED_11,
         |extractvalue(XMLRECORD,'/row/c12') as USE_DEF_ACCT,
         |extractvalue(XMLRECORD,'/row/c13') as LOCAL_REF,
         |extractvalue(XMLRECORD,'/row/c14') as RESERVED_10,
         |extractvalue(XMLRECORD,'/row/c15') as RESERVED_9,
         |extractvalue(XMLRECORD,'/row/c16') as RESERVED_8,
         |extractvalue(XMLRECORD,'/row/c17') as RESERVED_7,
         |extractvalue(XMLRECORD,'/row/c18') as RESERVED_6,
         |extractvalue(XMLRECORD,'/row/c19') as RECORD_STATUS,
         |extractvalue(XMLRECORD,'/row/c20') as CURR_NO,
         |extractvalue(XMLRECORD,'/row/c21') as INPUTTER,
         |extractvalue(XMLRECORD,'/row/c22') as DATE_TIME,
         |extractvalue(XMLRECORD,'/row/c23') as AUTHORISER,
         |extractvalue(XMLRECORD,'/row/c24') as CO_CODE,
         |extractvalue(XMLRECORD,'/row/c25') as DEPT_CODE,
         |extractvalue(XMLRECORD,'/row/c26') as AUDITOR_CODE,
         |extractvalue(XMLRECORD,'/row/c27') as AUDIT_DATE_TIME
         |FROM CBSPROD.F_ATM_BRANCH ) tmp """.stripMargin


    val DF = spark.read
      .format("jdbc")
      .option("url", jdbcUrl)
      .option("dbtable", query)
      .option("user", jdbcUsername)
      .option("password", jdbcPassword)
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .load()
    // DF.createOrReplaceTempView("v_categ_entry")
    DF.show(100, truncate = false)

    print("TOTAL: "+DF.count())

    /* WRITE TO SQL IF NEED */
    val url = "jdbc:sqlserver://AHODKT0193;databaseName=ASS;integratedSecurity=true";
    import java.util.Properties
    val connectionProperties = new Properties()
    connectionProperties.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver" )
    DF.write.jdbc(url, "ATM_Terminal", connectionProperties)

    DF.unpersist()
    spark.stop()

  }
}
Written on May 8, 2019