spark.read.parquet("s3://backet/data/year=20{20,19}/month=*/day=[01]*").Closest alternative will be:
spark.read.parquet("s3://backet/data").where("[insane sql here]")Timewise first approach is odrers of magnitude faster.
Thoughts on software development
spark.read.parquet("s3://backet/data/year=20{20,19}/month=*/day=[01]*").Closest alternative will be:
spark.read.parquet("s3://backet/data").where("[insane sql here]")Timewise first approach is odrers of magnitude faster.
val sc: SparkContext = new SparkContext(master, \ "SuperApp", System.getenv("SPARK_HOME")) val session: SparkSession = SparkSession.builder().getOrCreate() //Firstly define schema val struct = StructType( StructField("price", DoubleType, false) :: StructField("_id", StringType, false) :: StructField("modelYearId", IntegerType, false) :: StructField("zip", IntegerType, false) :: StructField("modelYear", IntegerType, false) :: StructField("modelId", IntegerType, false) :: StructField("makeId", IntegerType, false) :: StructField("mileage", DoubleType, false) :: Nil) val df: DataFrame = session.sqlContext.read.schema(struct) .option("header", "true").csv("cars.csv") var data: DataFrame = df //Transform variable into categorical one data = new OneHotEncoder() .setInputCol("zip") .setOutputCol("zipVec").transform(data) //Assemble features that matter val assembler = new VectorAssembler(). setInputCols(Array("modelYearIdVec", "zipVec", "modelYear", "modelIdVec", "makeIdVec", "mileage")). setOutputCol("features") //to verify our schema is as we want data.printSchema() data = assembler.transform(data)
// Split the data into training and //test sets (30% held out for testing). val Array(trainingData, testData) = data .randomSplit(Array(0.7, 0.3)) val lr = new LinearRegression() .setLabelCol("price") .setMaxIter(200) .setRegParam(10) .setElasticNetParam(0.75) // Fit the model val lrModel: LinearRegressionModel = lr.fit(trainingData) // Print the coefficients and intercept for linear regression println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}") // Summarize the model over the training set //and print out some metrics val trainingSummary = lrModel.summary println(s"numIterations: ${trainingSummary.totalIterations}") println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]") trainingSummary.residuals.show() println(s"RMSE: ${trainingSummary.rootMeanSquaredError}") println(s"r2: ${trainingSummary.r2}")
First, quite readable solution:Step 1: covert to lowercase Step 2: replace any alpheNumeric to hypen(-). Start to End of the string: Don't consider unicodes Step 3: Remove any adjacent hypen(-) Step 4: Remove trailing or leading hyphens
public static final String CHARS_REG = "[^a-zA-Z0-9]"; public static final String DOUBLE_REG = "[-]+"; public static final String TRIM_REG = "-$|^-"; public static final String DASH = "-"; static final Pattern CHARS = Pattern.compile(CHARS_REG); static final Pattern DOUBLE = Pattern.compile(DOUBLE_REG); static final Pattern TRIM = Pattern.compile(TRIM_REG); public static String makeSlugNamePatterny(String name) { String it = name.toLowerCase().trim(); it = CHARS.matcher(it).replaceAll(DASH); it = DOUBLE.matcher(it).replaceAll(DASH); it = TRIM.matcher(it).replaceAll(""); return it; }Unfortunately, out-of-the-box java regexp support can not trim and lowercase, so we see 5 operations here. It was curious for me to implement it regex-free and to compare performance:
public static String makeSlugNameOptimized(String name) { String result = name + " "; StringBuilder temp = new StringBuilder(); char ch1 = 0; char ch2 = toLowCaseAlfanumeric(result.charAt(0)); for (int i = 0; i < result.length() - 1; i++) { ch1 = ch2; ch2 = toLowCaseAlfanumeric(result.charAt(i + 1)); if (ch2 != ch1) { temp.append(ch1); } else if ('-' != ch1) { temp.append(ch1); } } while ('-' == temp.charAt(0)) { temp.deleteCharAt(0); } while ('-' == temp.charAt(temp.length() - 1)) { temp.deleteCharAt(temp.length() - 1); } return temp.toString(); } private static final int Aa = 'a' - 'A'; private static char toLowCaseAlfanumeric(char c) { if (c >= 'a' && c <= 'z') { return c; } if (c >= 'A' && c <= 'Z') { return (char) (c + Aa); } if (c >= '0' && c <= '9') { return c; } return '-'; }Note that optimized case doesn't trim or lowercase separately - it's a part of the loop.
@ImportResource("classpath:application-context/applicationContext.xml") //@Component public class BatchConfiguration
public static void main(String[] args) throws IOException { if (args.length >= 1) { Properties p = new Properties(); p.load(new FileReader(args[0])); p.forEach((x, y) -> { System.setProperty((String) x, (String) y); }); } SpringApplication.run(Application.class, args); }
bootRun { args =["cars-etl.properties"] }
import com.mongodb.BulkWriteOperation; import com.mongodb.BulkWriteResult; import com.mongodb.DBObject; import org.springframework.batch.item.ItemWriter; import org.springframework.data.mongodb.core.MongoTemplate; import java.util.List; public class MongoBulkItemWriter<T> implements ItemWriter<T> { private String collection; private MongoTemplate template; public MongoBulkItemWriter(String collection, MongoTemplate mongoTemplate) { this.collection = collection; this.template = mongoTemplate; } @Override public void write(List items) throws Exception { BulkWriteOperation bulk = template.getCollection(collection).initializeUnorderedBulkOperation(); items.forEach(i->{ bulk.insert((DBObject) template.getConverter().convertToMongoType(i)); }); BulkWriteResult result = bulk.execute(); } }
BulkWriteOperation bulk = template.getCollection(COLLECTION_NAME).initializeUnorderedBulkOperation(); updates.forEach(u -> { bulk.find(new BasicDBObject("id", u.getId())).upsert().update(u.getDbObject()); }); bulk.execute();