Hive内置了很多函数,可以参考Hive Built-In Functions。但是有些情况下,这些内置函数还是不能满足我们的需求,这时候就需要UDF出场了。
UDF全称:User-Defined Functions,即用户自定义函数,在Hive SQL编译成MapReduce任务时,执行java方法,类似于像MapReduce执行过程中加入一个插件,方便扩展。
Hive三种自定义函数:
- UDF(user-defined function)一进一出,给定一个输入,输出一个处理后的数据。许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length, rand等。这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。
- UDAF(user-defined aggregate function)多进一出,属于聚合函数,类似于count、sum等函数。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum, collect_set等。这种UDF主要有两种写法:继承实现UDAF类和继承实现AbstractGenericUDAFResolver类。
- UDTF(user-defined table function)一进多出,将输入的一行数据产生多行数据或者将一列打成多列。如explode函数通常配合lateral view使用,实现列转行的功能。parse_url_tuple将一列转为多列。

在决定编写自己的函数之前,可以先看看Hive中已经有了哪些函数及其功能:
- show functions; –查看已有函数
- describe function concat; –查看函数用法
- describe function extended concat; –查看函数的用法和示例
UDF(user-defined function)
Hive提供了两个实现UDF的方式:
第一种:继承UDF类
优点:
- 实现简单
- 支持Hive的基本类型、数组和Map
- 支持函数重载
缺点:
- 逻辑较为简单,只适合用于实现简单的函数
这种方式编码少,代码逻辑清晰,可以快速实现简单的UDF
第一种方式的代码实现最为简单,只需新建一个类继承UDF,然后编写evaluate()即可。
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* 继承org.apache.hadoop.hive.ql.exec.UDF
*/
public class SimpleUDF extends UDF {
/**
* 编写一个函数,要求如下:
* 1.函数名必须为evaluate
* 2.参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map
* 3.函数一定要有返回值,不能为void
*/
public int evaluate(int a, int b) {
return a + b;
}
/**
* 支持函数重载
*/
public Integer evaluate(Integer a, Integer b, Integer c) {
if(a == null || b == null || c == null)
return 0;
return a + b + c;
}
}
继承UDF类的方式非常简单,但还有一些需要注意的地方:
- evaluate()方法并不是继承自UDF类
- evaluate()的返回值类型不能为void
支持hive基本类型、数组和MapHive基本类型
Java可以使用Java原始类型、Java包装类或对应的Writable类对于基本类型,最好不要使用Java原始类型,当null传给Java原始类型参数时,UDF会报错。Java包装类还可以用于null值判断
| Hive类型 | Java原始类型 | Java包装类 | hadoop.io.Writable |
| tinyint | byte | Byte | ByteWritable |
| smallint | short | Short | ShortWritable |
| int | int | Integer | IntWritable |
| bigint | long | Long | LongWritable |
| string | String | – | Text |
| boolean | boolean | Boolean | BooleanWritable |
| float | float | Float | FloatWritable |
| double | double | Double | DoubleWritable |
数组和Map
| Hive类型 | Java类型 |
| array | List |
| Map<K,V> | Map<K,V> |
第二种:继承GenericUDF类
优点:
- 支持任意长度、任意类型的参数
- 可以根据参数个数和类型实现不同的逻辑
- 可以实现初始化和关闭资源的逻辑(initialize、close)
缺点:
- 实现比继承UDF要复杂一些
与继承UDF相比,GenericUDF更加灵活,可以实现更为复杂的函数
继承GenericUDF后,必须实现其三个方法:
- initialize()
- evaluate()
- getDisplayString()
initialize()
/** * 初始化 GenericUDF,每个 GenericUDF 示例只会调用一次初始化方法 * * @param arguments * 自定义 UDF 参数的 ObjectInspector 实例 * @throws UDFArgumentException * 参数个数或类型错误时,抛出该异常 * @return 函数返回值类型 */ public abstract ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException;
initialize() 在函数在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:
- 判断函数参数个数
- 判断函数参数类型
- 确定函数返回值类型
除此之外,用户在这里还可以做一些自定义的初始化操作,比如初始化 HDFS 客户端等1. 判断函数参数个数
可通过 arguments 数组的长度来判断函数参数的个数
判断函数参数个数示例:
// 1. 参数个数检查,只有一个参数
if (arguments.length != 1) // 函数只接受一个参数
throw new UDFArgumentException("函数需要一个参数"); // 当自定义 UDF 参数与预期不符时,抛出异常
2. 判断函数参数类型
ObjectInspector 可用于侦测参数数据类型,其内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型
public interface ObjectInspector extends Cloneable {
public static enum Category {
PRIMITIVE, // Hive 原始类型
LIST, // Hive 数组
MAP, // Hive Map
STRUCT, // 结构体
UNION // 联合体
};
}
Hive 原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的 Hive 原始类型
public interface PrimitiveObjectInspector extends ObjectInspector {
/**
* The primitive types supported by Hive.
*/
public static enum PrimitiveCategory {
VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
UNKNOWN
};
}
参数类型判断示例:
// 2. 参数类型检查,参数类型为 String
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是 Hive 原始类型
|| !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory())) // 参数是 Hive 的 string 类型
throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义 UDF 参数与预期不符时,抛出异常
3. 确定函数返回值类型
initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义 UDF 返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型
ObjectInspector 的源码中,有这么一段注释,其大意是 ObjectInspector 的实例应该由对应的工厂类获取,以保证实例的单例等属性
/**
* An efficient implementation of ObjectInspector should rely on factory, so
* that we can make sure the same ObjectInspector only has one instance. That
* also makes sure hashCode() and equals() method of java.lang.Object directly
* works for ObjectInspector as well.
*/
public interface ObjectInspector extends Cloneable {}
对于基本类型(byte、short、int、long、float、double、boolean、string),可以通过 PrimitiveObjectInspectorFactory 的静态字段直接获取
| Hive 类型 | writable 类型 | Java 包装类型 |
| tinyint | writableByteObjectInspector | javaByteObjectInspector |
| smallint | writableShortObjectInspector | javaShortObjectInspector |
| int | writableIntObjectInspector | javaIntObjectInspector |
| bigint | writableLongObjectInspector | javaLongObjectInspector |
| string | writableStringObjectInspector | javaStringObjectInspector |
| boolean | writableBooleanObjectInspector | javaBooleanObjectInspector |
| float | writableFloatObjectInspector | javaFloatObjectInspector |
| double | writableDoubleObjectInspector | javaDoubleObjectInspector |
注意:基本类型返回值有两种:Writable 类型和 Java 包装类型:
- 在 initialize 指定的返回值类型为 Writable 类型时,在 evaluate() 中 return 的就应该是对应的 Writable 实例
- 在 initialize 指定的返回值类型为 Java 包装类型时,在 evaluate() 中 return 的就应该是对应的 Java 包装类实例
Array、Map<K,V> 等复杂类型,则可以通过 ObjectInspectorFactory 的静态方法获取
| Hive 类型 | ObjectInspectorFactory 的静态方法 | evaluate() 返回值类型 |
| Array | getStandardListObjectInspector(T t) | List |
| Map<K,V> | Map<K,V> |
返回值类型为Map<String,int>的示例:
// 3. 自定义UDF返回值类型为Map<String,int> return ObjectInspectorFactory.getStandardMapObjectInspector( PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key是String PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value是int );
完整的initialize()函数如下:
/**
* 初始化GenericUDF,每个GenericUDF示例只会调用一次初始化方法
*
* @param arguments
* 自定义UDF参数的ObjectInspector实例
* @throws UDFArgumentException
* 参数个数或类型错误时,抛出该异常
* @return 函数返回值类型
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1. 参数个数检查,只有一个参数
if (arguments.length != 1) // 函数只接受一个参数
throw new UDFArgumentException("函数需要一个参数"); // 当自定义UDF参数与预期不符时,抛出异常
// 2. 参数类型检查,参数类型为String
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是Hive原始类型
|| !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory())) // 参数是Hive的string类型
throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义UDF参数与预期不符时,抛出异常
// 3. 自定义UDF返回值类型为Map<String,int>
return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key是String
PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value是int
);
}
evaluate()
核心方法,自定义UDF的实现逻辑
代码实现步骤可以分为三部分:
- 参数接收
- 自定义UDF核心逻辑
- 返回处理结果
1. 参数接收
evaluate()的参数就是自定义UDF的参数
/** * Evaluate the GenericUDF with the arguments. * * @param arguments * The arguments as DeferedObject, use DeferedObject.get() to get the * actual argument Object. The Objects can be inspected by the * ObjectInspectors passed in the initialize call. * @return The */ public abstract Object evaluate(DeferredObject[] arguments) throws HiveException;
通过源码注释可知,DeferedObject.get()可以获取参数的值:
/**
* A DeferedObject allows us to do lazy-evaluation and short-circuiting.
* GenericUDF use DeferedObject to pass arguments.
*/
public static interface DeferredObject {
void prepare(int version) throws HiveException;
Object get() throws HiveException;
};
再看看DeferredObject的源码,DeferedObject.get()返回的是Object,传入的参数不同,会是不同的Java类型,以下是Hive常用参数类型对应的Java类型
对于Hive基本类型,传入的都是Writable类型
| Hive类型 | Java类型 |
| tinyint | ByteWritable |
| smallint | ShortWritable |
| int | IntWritable |
| bigint | LongWritable |
| string | Text |
| boolean | BooleanWritable |
| float | FloatWritable |
| double | DoubleWritable |
| Array | ArrayList |
| Map<K,V> | HashMap<K,V> |
参数接收示例:
// 只有一个参数:Map<String,int> // 1. 参数为null时的特殊处理 if (arguments[0] == null) return ... // 2. 接收参数 Map<Text, IntWritable> map = (Map<Text, IntWritable>) arguments[0].get();
2. 自定义UDF核心逻辑
获取参数之后,到这里就是自由发挥了~
3. 返回处理结果
这一步和initialize()的返回值一一对应
基本类型返回值有两种:Writable类型和Java包装类型:
- 在initialize指定的返回值类型为Writable类型时,在evaluate()中return的就应该是对应的Writable实例
- 在initialize指定的返回值类型为Java包装类型时,在evaluate()中return的就应该是对应的Java包装类实例
Hive数组和Map的返回值类型如下:
| Hive类型 | Java类型 |
| Array<T> | List<T> |
| Map<K,V> | Map<K,V> |
getDisplayString()getDisplayString()返回的是 explain 时展示的信息
/** * Get the String to be displayed in explain. */ public abstract String getDisplayString(String[] children);
注意:这里不能 return null,否则可能在运行时抛出空指针异常,而且这个出现这个问题还不容易排查~
ERROR [b1c82c24-bfea-4580-9a0c-ff47d7ef4dbe main] ql.Driver: FAILED: NullPointerException null
java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
...
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
close()
资源关闭回调函数
不是抽象方法,可以不实现
/**
* Close GenericUDF.
* This is only called in runtime of MapRedTask.
*/
@Override
public void close() throws IOException {}
自定义 GenericUDF 完整示例
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class SimpleGenericUDF extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1.参数个数检查
if (arguments.length != 1) // 函数只接受一个参数
throw new UDFArgumentException("函数需要一个参数"); // 当自定义 UDF 参数与预期不符时,抛出异常
// 2.参数类型检查
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是 Hive 原始类型
|| !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory())) // 参数是 Hive 的 string 类型
throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义 UDF 参数与预期不符时,抛出异常
// 3.自定义 UDF 返回值类型为 Map<String, int>
return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key 是 String
PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value 是 int
);
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// 1.参数接收
if (arguments[0] == null)
return new HashMap<>();
String str = ((Text) arguments[0].get()).toString();
// 2.自定义 UDF 核心逻辑
// 统计字符串中每个字符的出现次数,并将其记录在 Map 中
Map<String, Integer> map = new HashMap<>();
for (char ch : str.toCharArray()) {
String key = String.valueOf(ch);
Integer count = map.getOrDefault(key, 0);
map.put(key, count + 1);
}
// 3.返回处理结果
return map;
}
@Override
public String getDisplayString(String[] children) {
return "这是一个简单的测试自定义 UDF~";
}
}
UDAF(user-defined aggregate function)
实现 UDAF 需要实现两个类
- apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2:UDAF 入口类,负责参数校验,决定 UDAF 核心逻辑实现类。
- apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator:UDAF 核心逻辑实现类,负责数据聚合。
为了更加直观,本篇文章将以实现计算平均数的案例来讲解
GenericUDAFResolver2
GenericUDAFResolver2 是 UDAF 的入口类,负责参数检验。实现 GenericUDAFResolver2 接口,并实现其方法即可。
public class Avg implements GenericUDAFResolver2 {
/**
* UDAF入口函数
* 负责:
* 1. 参数校验
* 2. 返回 UDAF 核心逻辑实现类
*/
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
ObjectInspector[] parameters = info.getParameterObjectInspectors();
// 1. 参数个数校验
if (parameters.length != 1)
throw new UDFArgumentException("只接受一个参数");
// 2. 参数类型校验
else if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE ||
((PrimitiveObjectInspector)parameters[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT)
throw new UDFArgumentException("第一个参数是int");
// 3. 可以获取参数的其他信息
if (info.isAllColumns()) // 函数参数是否为*
System.out.println("FUNCTION(*)");
if (info.isDistinct()) // 函数参数是否被DISTINCT修饰
System.out.println("FUNCTION(DISTINCT xxx)");
if (info.isWindowing()) // 是否是窗口函数
System.out.println("FUNCTION() OVER(xxx)");
// 3. UDAF核心逻辑实现类
return new AvgEvaluator();
}
/**
* 该方法是用于兼容老的UDAF接口,不用实现
* 如果通过AbstractGenericUDAFResolver实现Resolver,则该方法作为UDAF的入口
*/
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
throw new UDFArgumentException("方法未实现");
}
}
GenericUDAFEvaluator
GenericUDAFEvaluator是UDAF的核心逻辑实现,需要实现的方法较多,而且不同的模式下会调用不同的方法
在实现GenericUDAFEvaluator之前,首先需要理解它的四个模式Mode
GenericUDAFEvaluator内部有一个Mode枚举类,并且有一个对应的成员变量
Mode对应了MapReduce中的一些阶段,其详细信息请见下方代码
/**
* UDAF入口函数类
*/
public abstract class GenericUDAFEvaluator implements Closeable {
/**
* Mode.
*
*/
public static enum Mode {
/**
* 读取原始数据,聚合部分数据,获得部分聚合结果
* 调用:iterate()、terminatePartial()
* 对应Map阶段(不包括Combiner)
*/
PARTIAL1,
/**
* 读取部分聚合结果,再做部分聚合,获得新的部分聚合结果
* 调用:merge()、terminatePartial()
* 对应Map的Combiner阶段
*/
PARTIAL2,
/**
* 读取部分聚合结果,进行全局聚合,获得全局聚合结果
* 调用:merge()、terminate()
* 对应Reduce阶段
*/
FINAL,
/**
* 读取原始数据,直接进行全局聚合,获得全局聚合结果 and
* 调用:iterate()、terminate()
* 对应MapOnly任务,只有Map阶段
*/
COMPLETE
};
Mode mode;
}
各个Mode调用的方法如下:

AggregationBuffer
聚合过程中,用于保存中间结果的Buffer
核心函数
| 函数 | 描述 |
| getNewAggregationBuffer() | 获取一个新的Buffer,用于保存中间计算结果 |
| reset(agg) | 重置Buffer,在Hive程序执行时,可能会复用Buffer实例 |
| init(m, parameters) | 各个模式下,都会调用该方法进行初始化。校验上一阶段的参数,并且决定该阶段的输出 |
| iterate(agg, parameters) | 读取原始数据,计算部分聚合结果 |
| terminatePartial(agg) | 输出部分聚合结果 |
| merge(agg, partial) | 合并部分聚合结果 |
| terminate(agg) | 输出全局聚合结果 |
核心函数的调用过程如下:

实现代码:
/**
* UDAF核心逻辑类
*/
public class AvgEvaluator extends GenericUDAFEvaluator {
/**
* 聚合过程中,用于保存中间结果的Buffer
* 继承AbstractAggregationBuffer
* <p>
* 对于计算平均数,我们首先要计算总和(sum)和总数(count)
* 最后用总和/总数就可以得到平均数
*/
private static class AvgBuffer extends AbstractAggregationBuffer {
// 总和
private Integer sum = 0;
// 总数
private Integer count = 0;
}
/**
* 初始化
*
* @param m 聚合模式
* @param parameters 上一个阶段传过来的参数,可以在这里校验参数:
* 在PARTIAL1和COMPLETE模式,代表原始数据
* 在PARTIAL2和FINAL模式,代表部分聚合结果
* @return 该阶段最终的返回值类型
* 在PARTIAL1和PARTIAL2模式,代表terminatePartial()的返回值类型
* 在FINAL和COMPLETE模式,代表terminate()的返回值类型
*/
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {
// 在PARTIAL1和PARTIAL2模式,代表terminatePartial()的返回值类型
// terminatePartial()返回的是部分聚合结果,这时候需要传递sum和count,所以返回类型是结构体
List<ObjectInspector> structFieldObjectInspectors = new LinkedList<ObjectInspector>();
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList("sum", "count"),
structFieldObjectInspectors
);
} else {
// 在FINAL和COMPLETE模式,代表terminate()的返回值类型
// 该函数最终返回一个double类型的数据,所以这里的返回类型是double
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}
}
/**
* 获取一个新的Buffer,用于保存中间计算结果
*/
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
// 直接实例化一个AvgBuffer
return new AvgBuffer();
}
/**
* 重置Buffer,在Hive程序执行时,可能会复用Buffer实例
*
* @param agg 被重置的Buffer
*/
public void reset(AggregationBuffer agg) throws HiveException {
// 重置AvgBuffer实例的状态
((AvgBuffer)agg).sum = 0;
((AvgBuffer)agg).count = 0;
}
/**
* 读取原始数据,计算部分聚合结果
*
* @param agg 用于保存中间结果
* @param parameters 原始数据
*/
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
if (parameters == null || parameters[0] == null)
return;
if (parameters[0] instanceof IntWritable) {
// 计算总和
((AvgBuffer)agg).sum += ((IntWritable)parameters[0]).get();
// 计算总数
((AvgBuffer)agg).count += 1;
}
}
/**
* 输出部分聚合结果
*
* @param agg 保存的中间结果
* @return 部分聚合结果,不一定是一个简单的值,可能是一个复杂的结构体
*/
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
// 传递中间结果时,必须传递总和、总数
// 这里需要返回一个数组,表示结构体
return new Object[]{
new IntWritable(((AvgBuffer)agg).sum),
new IntWritable(((AvgBuffer)agg).count)
};
}
/**
* 合并部分聚合结果
* 输入:部分聚合结果
* 输出:部分聚合结果
*
* @param agg 当前聚合中间结果类
* @param partial 其他部分聚合结果值
*/
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
// 传递过来的结构体为LazyBinaryStruct类型,需要从中提取数据
((AvgBuffer)agg).sum += ((IntWritable)((LazyBinaryStruct)partial).getField(0)).get();
((AvgBuffer)agg).count += ((IntWritable)((LazyBinaryStruct)partial).getField(1)).get();
}
}
/**
* 输出全局聚合结果
*
* @param agg 保存的中间结果
*/
public Object terminate(AggregationBuffer agg) throws HiveException {
// 总和/总数
return new DoubleWritable(1.0 * ((AvgBuffer)agg).sum / ((AvgBuffer)agg).count);
}
}
UDTF(user-defined table function)
实现自定义UDTF需要继承GenericUDTF,并且实现其三个方法:
- initialize()
- process()
- close()
其中process()、close()为GenericUDTF中的抽象方法,必须实现。initialize()虽然不是抽象方法,但必须手动覆盖实现该方法,因为GenericUDTF的initialize()最终会抛出一个异常:
throw new IllegalStateException("Should not be called directly");
initialize()
需要覆盖实现的方法如下:
public StructObjectInspector initialize(StructObjectInspector argOIs)
throws UDFArgumentException {}
initialize()在函数在GenericUDTF初始化时被调用一次,执行一些初始化操作,包括:
- 判断函数参数个数
- 判断函数参数类型
- 确定函数返回值类型
除此之外,用户在这里还可以做一些自定义的初始化操作,比如初始化HDFS客户端等1.判断函数参数个数
initialize()的参数为StructObjectInspector argOIs
可以通过如下方式获取自定义UDTF的所有参数
List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();
判断参数个数的方式很简单,只要判断inputFieldRef的元素个数即可。
示例:
List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();
//参数个数为1
if(inputFieldRef.size() != 1)
throw new UDFArgumentException("需要一个参数");
2.判断函数参数类型
inputFieldRef的元素类型是StructField,可以通过StructField获取参数类型ObjectInspector
判断参数个数和类型的示例:
//1.判断参数个数,只有一个参数
List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();
if(inputFieldRef.size() != 1)
throw new UDFArgumentException("需要一个参数");
//2.判断参数类型,参数类型为string
ObjectInspector objectInspector = inputFieldRef.get(0).getFieldObjectInspector();
if(objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE //参数是Hive原始类型
||!PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)objectInspector).getPrimitiveCategory()))//参数是Hive的string类型
throw new UDFArgumentException("函数第一个参数为字符串");//当自定义UDF参数与预期不符时,抛出异常
3.确定函数返回值类型
UDTF函数可以对于一行输入,可以产生多行输出,并且每行结果可以有多列。自定义UDTF的返回值类型会稍微复杂些,需要明确输出结果的所有列名和列类型
initialize()方法的返回值类型为StructObjectInspector
StructObjectInspector表示了一行记录的结构,可以包括多个列。每个列有列名、列类型和列注释(可选)
可以通过ObjectInspectorFactory来获取StructObjectInspector实例:
/**
*structFieldNames:列
*/
ObjectInspectorFactory.getStandardStructObjectInspector(
List<String> structFieldNames,
List<ObjectInspector> structFieldObjectInspectors)
structFieldNames的第n个元素,代表了第n列的名称;structFieldObjectInspectors的第n个元素,代表了第n列的类型。
structFieldNames和structFieldObjectInspectors应该保持长度一致
//只有一列,列的类型为Map<String,int>
return ObjectInspectorFactory.getStandardStructObjectInspector(
Collections.singletonList("result_column_name"),
Collections.singletonList(
ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,//Key是String
PrimitiveObjectInspectorFactory.javaIntObjectInspector//Value是int
)
)
);
process()
核心方法,自定义UDTF的实现逻辑
代码实现步骤可以分为三部分:
- 参数接收
- 自定义UDTF核心逻辑
- 输出结果
/** *Give a set of arguments for the UDTF to process. * *@param args *object array of arguments */ public abstract void process(Object[] args) throws HiveException;
1.参数接收
args即是自定义UDTF的参数,传入的参数不同,会是不同的Java类型,以下是Hive常用参数类型对应的Java类型
| Hive类型 | Java类型 |
| tinyint | ByteWritable |
| smallint | ShortWritable |
| int | IntWritable |
| bigint | LongWritable |
| string | Text |
| boolean | BooleanWritable |
| float | FloatWritable |
| double | DoubleWritable |
| Array | ArrayList |
| Map<K,V> | HashMap<K,V> |
参数接收示例:
//参数null值的特殊处理 if(args[0] == null) return; //接收参数 String str = ((Text)args[0]).toString();
2.自定义UDTF核心逻辑
获取参数之后,到这里就是自由发挥了~
3.输出结果
process()方法本身没有返回值,通过GenericUDTF中的forward()输出一行结果。forward()可以反复调用,可以输出任意行结果
/**
*Passes an output row to the collector.
*
*@param o
*@throws HiveException
*/
protected final void forward(Object o) throws HiveException {
collector.collect(o);
}
forward()可以接收List或Java数组,第n个元素代表第n列的值
List<Object> list = new LinkedList<>();
//第一列是int
list.add(1);
//第二列是string
list.add("hello");
//第三列是boolean
list.add(true);
//输出一行结果
forward(list);
close()
没有其他输入行时,调用该函数
可以进行一些资源关闭处理等最终处理
UDF相关语法
UDF使用需要将编写的UDF类编译为jar包添加到Hive中,根据需要创建临时函数或永久函数。
resources操作
Hive支持向会话中添加资源,支持文件、jar、存档,添加后即可在sql中直接引用,仅当前会话有效,默认读取本地路径,支持hdfs等,路径不加引号。例:add jar /opt/ht/AddUDF.jar;
#添加资源
ADD {FILE[S]|JAR[S]|ARCHIVE[S]} <filepath1> [<filepath2>]*
#查看资源
LIST {FILE[S]|JAR[S]|ARCHIVE[S]} [<filepath1> <filepath2>..]
#删除资源
DELETE {FILE[S]|JAR[S]|ARCHIVE[S]} [<filepath1> <filepath2>..]
临时函数
仅当前会话有效,不支持指定数据库,USING路径需加引号。
CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
永久函数
函数信息入库,永久有效,USING路径需加引号。临时函数与永久函数均可使用USING语句,Hive会自动添加指定文件到当前环境中,效果与add语句相同,执行后即可list查看已添加的文件或jar包。
CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; DROP FUNCTION [IF EXISTS] function_name; RELOAD (FUNCTIONS|FUNCTION);
查看函数
#查看所有函数,不区分临时函数与永久函数 show functions; #函数模糊查询,此处为查询x开头的函数 show functions like 'x*'; #查看函数描述 desc function function_name; #查看函数详细描述 desc function extended function_name;
escription注解
Hive已定义注解类型org.apache.hadoop.hive.ql.exec.Description,用于执行descfunction[extended]function_name时介绍函数功能,内置函数与自定义函数用法相同。
若Description注解名称与创建UDF时指定名称不同,以创建UDF时指定名称为准。
public @interface Description {
//函数简单介绍
String value() default "_FUNC_is undocumented";
//函数详细使用说明
String extended() default "";
//函数名称
String name() default "";
}
Hive UDF实战—经纬度编码
在Idea中,新建一个Maven项目,命名为: GeoUtilsUdf。
pom.xml文件的修改
修改调整后的pom.xml:
4.0.0 com.ly GeoUtilsUdf 1.0-SNAPSHOT org.apache.hive hive-exec 1.1.0 provided org.apache.hadoop hadoop-common 2.6.0 jdk.tools jdk.tools com.github.davidmoten geo 0.8.0 com.uber h3 4.1.1 org.apache.maven.plugins maven-assembly-plugin jar-with-dependencies make-jar-with-dependencies package single org.apache.maven.plugins maven-compiler-plugin 8 8
pom.xml节点说明:
1.引入hive及hadoop依赖
org.apache.hive hive-exec 1.1.0 provided org.apache.hadoop hadoop-common 2.6.0 jdk.tools jdk.tools
注意,版本必须与线上的Hive和Hadoop版本一致。另外hive-exec的scope设为provided。因为Hive的lib目录下已经包括了hive-exec的jar,Hive会自动将其加载。所以这里就不需要再打包进入jar。
2.添加外部依赖,不重复造轮子。
使用外部已有的java包,一个是Geohash,另外一个是uberh3。
com.github.davidmoten geo 0.8.0 com.uber h3 4.1.1
3.打包设置,将外部依赖打包进入jar。
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
编写Java代码
在src/main/java目录下新建两个Package,并在分别为:
- hive.udf.geohash
- hive.udf.h3
在com.hive.udf.geohash下先建2个JavaClass:
- UDFGeohashEncode(将经纬度转化为geohash)
- UDFGeohashDecode(将geohash转化为经纬度)
代码如下:
# UDFGeohashEncode.java
package com.hive.udf.geohash;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.github.davidmoten.geo.GeoHash;
public class UDFGeohashEncode extends UDF {
public Text evaluate(DoubleWritable longitude, DoubleWritable latitude, IntWritable precision){
if(latitude == null || longitude == null || precision == null){
return null;
}
return new Text(GeoHash.encodeHash(latitude.get(), longitude.get(), precision.get()));
}
}
# UDFGeohashDecode.java
package com.hive.udf.geohash;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import com.github.davidmoten.geo.GeoHash;
import com.github.davidmoten.geo.LatLong;
public class UDFGeohashDecode extends UDF {
public Text evaluate(Text geohash){
if(geohash == null){
return null;
}
LatLong location = GeoHash.decodeHash(geohash.toString());
return new Text(location.getLon() + "," + location.getLat());
}
}
在com.hive.udf.h3下先建3个JavaClass:
- UDFH3Encode(将经纬度转化为H3)
- UDFH3ToGeo(将H3转化为经纬度)
- UDFH3ToGeoBoundary(将H3转化为6个顶点的经纬度)
代码示例:
#UDFH3Encode.java
package com.hive.udf.h3;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.uber.h3core.H3Core;
import java.io.IOException;
public class UDFH3Encode extends UDF {
public Text evaluate(DoubleWritable longitude, DoubleWritable latitude, IntWritable precision){
H3Core h3 = null;
try{
h3 = H3Core.newInstance();
}catch(IOException e){
e.printStackTrace();
}
if(latitude == null || longitude == null || precision == null){
return null;
}
return new Text(h3.latLngToCellAddress(latitude.get(), longitude.get(), precision.get()));
}
}
#UDFH3ToGeo.java
package com.hive.udf.h3;
import com.uber.h3core.H3Core;
import com.uber.h3core.util.LatLng;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashMap;
public class UDFH3ToGeo extends GenericUDF {
private ObjectInspectorConverters.Converter[] converters;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if (arguments.length != 1) {
throw new UDFArgumentLengthException(
“The function UDFH3ToGeo(s) takes exactly 1 arguments.”);
}
converters = new ObjectInspectorConverters.Converter[arguments.length];
converters[0] = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.writableStringObjectInspector);
return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector);
}
@Override
public HashMap
assert (arguments.length == 1);
H3Core h3 = null;
Text hexAddr = (Text) converters[0].convert(arguments[0].get());
if (hexAddr == null) {
return null;
}
try {
h3 = H3Core.newInstance();
} catch (IOException e) {
e.printStackTrace();
}
LatLng coords = h3.cellToLatLng(String.valueOf(hexAddr));
HashMap
try {
map_double.put(“lng”, coords.lng);
map_double.put(“lat”, coords.lat);
} catch (Exception ex) {
ex.printStackTrace();
}
return map_double;
}
@Override
public String getDisplayString(String[] strings) {
return null;
}
}
#UDFH3ToGeoBoundary.java
package com.hive.udf.h3;
import com.uber.h3core.H3Core;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class UDFH3ToGeoBoundary extends GenericUDF {
private ObjectInspectorConverters.Converter[] converters;
private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
private transient ArrayList<Object> ret = new ArrayList<Object>();
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"The function UDFH3ToGeoBoundary(s) takes exactly 1 arguments.");
}
converters = new ObjectInspectorConverters.Converter[arguments.length];
converters[0] = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.javaStringObjectInspector);
ObjectInspector returnOI =
returnOIResolver.get(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == 1);
H3Core h3 = null;
String hexAddr = (String) converters[0].convert(arguments[0].get());
if (hexAddr == null) {
return null;
}
try {
h3 = H3Core.newInstance();
} catch (IOException e) {
e.printStackTrace();
}
List<com.uber.h3core.util.LatLng> geoCoords = h3.cellToBoundary(hexAddr);
ret.clear();
for (com.uber.h3core.util.LatLng str : geoCoords) {
StringBuilder sb = new StringBuilder();
sb.append(str.lng);
sb.append(",");
sb.append(str.lat);
ret.add(sb);
}
return ret;
}
public String getDisplayString(String[] strings) {
return null;
}
}
编译生成Jar包
命令行执行:mvn clean install
执行完成后会在 target 目录下生成 GeoUtilsUdf-1.0-SNAPSHOT-jar-with-dependencies.jar 文件,此文件就是最终我们要使用的 jar 包。
完成后上传到线上的目录。使用示例:
CREATE TEMPORARY FUNCTION geotoh3address AS 'com.hive.udf.h3.UDFH3Encode' USING JAR 'viewfs://dcfs/ns-common/car/dev/udf_jar/GeoUtilsUdf-1.0-SNAPSHOT-jar-with-dependencies.jar'; SELECT geotoh3address(CAST(lng AS DOUBLE), CAST(lat AS DOUBLE), 7) AS h3hex FROM testdb.test_table LIMIT 10
不重复造轮子,以下一些开源的 Hive UDF 可以复制编译后使用:
- https://github.com/brndnmtthws/facebook-hive-udfs
- https://github.com/klout/brickhouse
- https://github.com/nexr/hive-udf
- https://github.com/aaronshan/hive-third-functions
- https://github.com/yahoo/hive-funnel-udf
- https://github.com/dataiku/dataiku-hive-udf
- https://github.com/petrabarus/HiveUDFs
- https://github.com/Spuul/hive-udfs
- https://github.com/jet2007/jet-hive-udf
- https://github.com/rweald/hive-udfs
- https://github.com/rueedlinger/hive-udf
- https://github.com/ychantit/fuzzymatch_hiveUDF
- https://github.com/Spuul/hive-udfs
参考链接:



