UDF来源于Hive,Hive可以允许用户编写自己定义的函数UDF,然后在查询中进行使用。星环Inceptor中的UDF开发规范与Hive相同,目前有3种UDF:
UDF(User Defined Function),即用户自定义函数,能结合SQL语句一起使用,更好地表达复杂的业务逻辑,一般以单个数据行为参数,输出单个数据行;比如数学函数、字符串函数、时间函数、拼接函数
UDTF(User Defined Table Function),即用户自定义表函数,它与UDF类似。区别在于UDF只能实现一对一,而它用来实现多(行/列)对多(行/列)数据的处理逻辑。一般以一个数据行为参数,输出多个数据行为一个表作为输出,如lateral、view、explore;
UDAF(User Defined Aggregate Function),用户自定义聚合函数,是由用户自主定义的,用法同如MAX、MIN和SUM已定义的聚合函数一样的处理函数。而且,不同于只能处理标量数据的系统定义的聚合函数,UDAF的可以接受并处理更广泛的数据类型,如用对象类型、隐式类型或者LOB存储的多媒体数据。由于UDAF也属于聚合函数中的一种,同样也需要与GROUPBY结合使用。
一般UDAF以多个数据行为参数,接收多个数据行,并输出一个数据行,比如COUNT、MAX;
星环Quark计算引擎中内置了很多函数,同时支持用户自行扩展,按规则添加后即可在sql执行过程中使用,目前支持UDF、UDTF、UDAF三种类型,一般UDF应用场景较多,后面将着重介绍UDF的开发与使用。UDAF及UDTF将主要介绍开发要点以及Demo示例。
Quark的UDF接口兼容开源Hive的UDF接口,用户可以参考开源Hive的UDF手册,或者直接把开源Hive的UDF迁移到Quark上。
Quark类型 | Java原始类型 | Java包装类 | hadoop.hive.ioWritable |
---|---|---|---|
tinyint | byte | Byte | ByteWritable |
smallint | short | Short | ShortWritable |
int | int | Integer | IntWritable |
bigint | long | Long | LongWritable |
string | - | String | Text |
char | char | Character | HiveCharWritable |
boolean | boolean | Boolean | BooleanWritable |
float | float | Float | FloatWritable |
double | double | Double | DoubleWritable |
decimal | - | BigDecimal | HiveDecimalWritable |
date | - | Date | DateWritable |
array | - | List | ArrayListWritable |
Map<K,V> | - | Map<K.V> | HashMapWritable |
Quark 提供了两个实现 UDF 的方式:
一般在以下几种场景下考虑使用GenericUDF:
pom文件的依赖导入
--UDF开发依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>inceptor-exec</artifactId>
<version>xxx</version>
</dependency>
该方式实现简单,只需新建一个类继承org.apache.hadoop.hive.ql.exec.UDF;
继承UDF类必须实现evaluate方法且返回值类型不能为 void,支持定义多个evaluate方法不同参数列表用于处理不同类型数据;
可通过完善@Description
展示UDF用法 UDF样例。
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
@Description(
name="my_plus",
value="my_plus() - if string, do concat; if integer, do plus",
extended = "Example : \n >select my_plus('a', 'b');\n >ab\n >select my_plus(3, 5);\n >8"
)
/**
* 实现UDF函数,若字符串执行拼接,int类型执行加法运算。
*/
public class AddUDF extends UDF {
/**
* 编写一个函数,要求如下:
* 1. 函数名必须为 evaluate
* 2. 参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map
* 3. 函数一定要有返回值,不能为 void
*/
public String evaluate(String... parameters) {
if (parameters == null || parameters.length == 0) {
return null;
}
StringBuilder sb = new StringBuilder();
for (String param : parameters) {
sb.append(param);
}
return sb.toString();
}
/**
* 支持函数重载
*/
public int evaluate(IntWritable... parameters) {
if (parameters == null || parameters.length == 0) {
return 0;
}
long sum = 0;
for (IntWritable currentNum : parameters) {
sum = Math.addExact(sum, currentNum.get());
}
return (int) sum;
}
}
GenericUDF相比与UDF功能更丰富,支持所有参数类型,实现起来也更加复杂。org.apache.hadoop.hive.ql.udf.generic.GenericUDF
API提供了一个通用的接口将任何数据类型的对象当作泛型Object去调用和输出,参数类型由ObjectInspector封装;参数Writable类由DeferredObject封装,使用时简单类型可直接从Writable获取,复杂类型可由ObjectInspector解析。
Java的ObjectInspector类,用于帮助Quark了解复杂对象的内部架构,通过创建特定的ObjectInspector对象替代创建具体类对象,在内存中储存某类对象的信息。在UDF中,ObjectInspector用于帮助Hive引擎将HQL转成MR Job时确定输入和输出的数据类型。Hive语句会生成MapReduce Job执行,所以使用的是Hadoop数据格式,不是编写UDF的Java的数据类型,比如Java的int在Hadoop为IntWritable,String在Hadoop为Text格式,所以我们需要将UDF内的Java数据类型转成正确的Hadoop数据类型以支持Hive将HQL生成MapReduce Job。
继承 GenericUDF 后,必须实现以下三个方法:
public class MyCountUDF extends GenericUDF {
private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
private transient ObjectInspectorConverters.Converter intConverter;
private transient ObjectInspectorConverters.Converter longConverter;
// 初始化
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
}
// DeferredObject封装实际参数的对应Writable类
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
}
// 函数信息
@Override
public String getDisplayString(String[] strings) {
}
}
initialize()方法只在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:参数个数检查;参数类型检查与转换;确定返回值类型。
a. 参数个数检查;
可通过 arguments 数组的长度来判断函数参数的个数:
// 检查该记录是否传过来正确的参数数量,arguments的长度不为2时,则抛出异常
if (arguments.length != 2) {
throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List<T>, T");
}
b. 参数类型检查与转换;
针对该UDF的每个参数,initialize()方法都会收到一个对应的ObjectInspector参数,通过遍历ObjectInspector数组检查每个参数类型,根据参数类型构造ObjectInspectorConverters.Converter,用于将Hive传递的参数类型转换为对应的Writable封装对象ObjectInspector,供后续统一处理。
ObjectInspector内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型。
public interface ObjectInspector extends Cloneable {
public static enum Category {
PRIMITIVE, // Hive原始类型
LIST, // Hive数组
MAP, // Hive Map
STRUCT, // 结构体
UNION // 联合体
};
}
Quark原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的Hive原始类型。
public interface PrimitiveObjectInspector extends ObjectInspector {
/**
* The primitive types supported by Quark.
*/
public static enum PrimitiveCategory {
VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
UNKNOWN
};
}
参数类型检查与转换示例:
for (int i = 0; i < length; i++) { // 遍历每个参数
ObjectInspector currentOI = arguments[i];
ObjectInspector.Category type = currentOI.getCategory(); // 获取参数类型
if (type != ObjectInspector.Category.PRIMITIVE) { // 检查参数类型
throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
}
PrimitiveObjectInspector.PrimitiveCategory primitiveType =
((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
inputType[i] = primitiveType;
switch (primitiveType) { // 参数类型转换
case INT:
if (intConverter == null) {
ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
}
break;
case LONG:
if (longConverter == null) {
ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
}
break;
default:
throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
}
}
c. 确定函数返回值类型
initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义UDF返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型。创建ObjectInspector时,不要用new的方式创建,应该用工厂模式去创建以保证相同类型的ObjectInspector只有一个实例,且同一个ObjectInspector可以在代码中多处被使用。
// 自定义UDF返回值类型为Long
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
完整的 initialize() 函数
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
int length = arguments.length;
inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
for (int i = 0; i < length; i++) {
ObjectInspector currentOI = arguments[i];
ObjectInspector.Category type = currentOI.getCategory();
if (type != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
}
PrimitiveObjectInspector.PrimitiveCategory primitiveType =
((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
inputType[i] = primitiveType;
switch (primitiveType) {
case INT:
if (intConverter == null) {
ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
}
break;
case LONG:
if (longConverter == null) {
ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
}
break;
default:
throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
}
}
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
evaluate()方法是GenericUDF的核心方法,自定义UDF的实现逻辑。代码实现步骤可以分为三部分:参数接收;自定义UDF核心逻辑;返回处理结果。
第一步:参数接收
evaluate() 的参数就是 自定义UDF 的参数。
/**
* Evaluate the GenericUDF with the arguments.
*
* @param arguments
* The arguments as DeferedObject, use DeferedObject.get() to get the
* actual argument Object. The Objects can be inspected by the
* ObjectInspectors passed in the initialize call.
* @return The
*/
public abstract Object evaluate(DeferredObject[] arguments)
throws HiveException;
通过源码注释可知,DeferedObject.get() 可以获取参数的值。
/**
* A Defered Object allows us to do lazy-evaluation and short-circuiting.
* GenericUDF use DeferedObject to pass arguments.
*/
public static interface DeferredObject {
void prepare(int version) throws HiveException;
Object get() throws HiveException;
};
再看看 DeferredObject 的源码,DeferedObject.get() 返回的是 Object,传入的参数不同,会是不同的Java类型。
第二步:自定义UDF核心逻辑
这一部分根据实际项目需求自行编写。
第三步:返回处理结果
这一步和 initialize() 的返回值一一对应,基本类型返回值有两种:Writable类型 和 Java包装类型:
evalute()示例
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
LongWritable out = new LongWritable();
for (int i = 0; i < deferredObjects.length; i++) {
PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
Object param = deferredObjects[i].get();
switch (type) {
case INT:
Object intObject = intConverter.convert(param);
out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
break;
case LONG:
Object longObject = longConverter.convert(param);
out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
break;
default:
throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
}
}
return out;
}
getDisplayString() 返回的是 explain 时展示的信息。这里不能return null,否则可能在运行时抛出空指针异常。
@Override
public String getDisplayString(String[] strings) {
return "my_count(" + Joiner.on(", ").join(strings) + ")";
}
自定义GenericUDF完整示例
@Description(
name="my_count",
value="my_count(...) - count int or long type numbers",
extended = "Example :\n >select my_count(3, 5);\n >8\n >select my_count(3, 5, 25);\n >33"
)
public class MyCountUDF extends GenericUDF {
private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
private transient ObjectInspectorConverters.Converter intConverter;
private transient ObjectInspectorConverters.Converter longConverter;
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
int length = objectInspectors.length;
inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
for (int i = 0; i < length; i++) {
ObjectInspector currentOI = objectInspectors[i];
ObjectInspector.Category type = currentOI.getCategory();
if (type != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
}
PrimitiveObjectInspector.PrimitiveCategory primitiveType =
((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
inputType[i] = primitiveType;
switch (primitiveType) {
case INT:
if (intConverter == null) {
ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
}
break;
case LONG:
if (longConverter == null) {
ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
}
break;
default:
throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
}
}
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
LongWritable out = new LongWritable();
for (int i = 0; i < deferredObjects.length; i++) {
PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
Object param = deferredObjects[i].get();
switch (type) {
case INT:
Object intObject = intConverter.convert(param);
out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
break;
case LONG:
Object longObject = longConverter.convert(param);
out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
break;
default:
throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
}
}
return out;
}
@Override
public String getDisplayString(String[] strings) {
return "my_count(" + Joiner.on(", ").join(strings) + ")";
}
}
UDTF函数作用都是输入一行数据,将该行数据拆分、并返回多行数据。不同的UDTF函数只是拆分的原理不同、作用的数据格式不同而已。
注意:返回UDTF结果的同时查询其他对象,须引用关键字 LATERAL VIEW
1. 实现UDTF函数需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2. 然后重写/实现initialize, process, close三个方法
initialize初始化:UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型,名称)。initialize针对任务调一次, 作用是定义输出字段的列名、和输出字段的数据类型。
initialize方法示例
@Override
/**
* 返回数据类型:StructObjectInspector
* 定义输出数据的列名、和数据类型。
*/
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
List<String> fieldNames = new ArrayList<String>(); //fieldNames为输出的字段名
fieldNames.add("world");
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //类型,列输出类型
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
process:初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果写出。process传入一行数据写出去多次,传入一行数据输出多行数据,如:mapreduce单词计数。process针对每行数据调用一次该方法。在initialize初始化的时候,定义输出字段的数据类型是集合,调用forward()将数据写入到一个缓冲区,写入缓冲区的数据也要是集合。
process方法示例
//数据的集合
private List<String> dataList = new ArrayList<String>();
/**
* process(Object[] objects) 参数是一个数组,但是hive中的explode函数接受的是一个,一进多出
* @param args
* @throws HiveException
*/
public void process(Object[] args) throws HiveException {
//我们现在的需求是传入一个数据,在传入一个分割符
//1.获取数据
String data = args[0].toString();
//2.获取分割符
String splitKey = args[1].toString();
//3.切分数据,得到一个数组
String[] words = data.split(splitKey);
//4.想把words里面的数据全部写出去。类似在map方法中,通过context.write方法
// 定义是集合、写出去是一个string,类型不匹配,写出也要写出一个集合
for (String word : words) {
//5.将数据放置集合,EG:传入"hello,world,hdfs"---->写出需要写n次,hello\world
dataList.clear();//清空数据集合
dataList.add(word);
//5.写出数据的操作
forward(dataList);
}
}
最后close()方法调用,对需要清理的方法进行清理,close()方法针对整个任务调一次
下面UDTF 实现的是字符串的分拆,多行输出
package io.transwarp.udtf;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SplitUDF extends GenericUDTF{
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
@Override
public StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
// TODO Auto-generated method stub
if(arg0.length != 1){
throw new UDFArgumentLengthException("SplitString only takes one argument");
}
if(arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentException("SplitString only takes string as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("col2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] arg0) throws HiveException {
// TODO Auto-generated method stub
String input = arg0[0].toString();
String[] inputSplits = input.split("#");
for (int i = 0; i < inputSplits.length; i++) {
try {
String[] result = inputSplits[i].split(":");
forward(result);
} catch (Exception e) {
continue;
}
}
}
}
执行效果如下:
将UDTF打包后,放在inceptor server 所在节点之上(建议不要放在/user/lib/hive/lib/下),之后在连接inceptor执行以下命令,生成临时函数(server有效,重启inceptor失效)
add jar /tmp/timestampUDF.jar
drop temporary function timestamp_ms;
create temporary function timestamp_ms as 'io.transwarp.udf.ToTimestamp';
select date, timestamp_ms(date) from table1;
正如前面所说,UDAF是由用户自主定义的,虽然UDAF的使用可以方便对数据的运算处理,但是使用它的数量建议不要过多,因为UDAF的数量增长和性能下降成线性关系。另外,如果存在大量的嵌套UDAF,系统的性能也会降低,建议用户在可能的情况下写一个没有嵌套或者嵌套较少的UDAF实现相同功能来提高性能。
1. 用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;
2. 用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。
3. 一个计算函数必须实现的5个方法的具体含义如下:
下面的UDAF DEMO目标是实现找到最大值功能,以表中某一字段为参数,返回最大值。
package udaf.transwarp.io;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是输入多个数据行,产生一个数据行
//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
public class MaxiNumber extends UDAF{
public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
//最终结果
private IntWritable result;
//负责初始化计算函数并设置它的内部状态,result是存放最终结果的
@Override
public void init() {
result=null;
}
//每次对一个新值进行聚集计算都会调用iterate方法
public boolean iterate(IntWritable value)
{
if(value==null)
return false;
if(result==null)
result=new IntWritable(value.get());
else
result.set(Math.max(result.get(), value.get()));
return true;
}
//Hive需要部分聚集结果的时候会调用该方法
//会返回一个封装了聚集计算当前状态的对象
public IntWritable terminatePartial()
{
return result;
}
//合并两个部分聚集值会调用这个方法
public boolean merge(IntWritable other)
{
return iterate(other);
}
//Hive需要最终聚集结果时候会调用该方法
public IntWritable terminate()
{
return result;
}
}
}
将开发好自定义UDF函数的项目打包成jar包,注意:jar 包中的自定义UDF 类名,不能和现有UDF 类,在包名+类名上,完全相同。
常见的UDF部署方式有以下三种:
由于篇幅原因,具体打包的方式以及示例教程请参考: UDF部署视频教程