器→工具, 开源项目, 数据, 术→技巧, 编程语言

Hive UDF的开发简介

钱魏Way · · 336 次浏览
!文章内容如有错误或排版问题,请提交反馈,非常感谢!

Hive内置了很多函数,可以参考Hive Built-In Functions。但是有些情况下,这些内置函数还是不能满足我们的需求,这时候就需要UDF出场了。

UDF全称:User-Defined Functions,即用户自定义函数,在Hive SQL编译成MapReduce任务时,执行java方法,类似于像MapReduce执行过程中加入一个插件,方便扩展。

Hive三种自定义函数:

  • UDF(user-defined function)一进一出,给定一个输入,输出一个处理后的数据。许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length, rand等。这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。
  • UDAF(user-defined aggregate function)多进一出,属于聚合函数,类似于count、sum等函数。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum, collect_set等。这种UDF主要有两种写法:继承实现UDAF类和继承实现AbstractGenericUDAFResolver类。
  • UDTF(user-defined table function)一进多出,将输入的一行数据产生多行数据或者将一列打成多列。如explode函数通常配合lateral view使用,实现列转行的功能。parse_url_tuple将一列转为多列。

在决定编写自己的函数之前,可以先看看Hive中已经有了哪些函数及其功能:

  • show functions; –查看已有函数
  • describe function concat; –查看函数用法
  • describe function extended concat; –查看函数的用法和示例

UDF(user-defined function)

Hive提供了两个实现UDF的方式:

第一种:继承UDF类

优点:

  • 实现简单
  • 支持Hive的基本类型、数组和Map
  • 支持函数重载

缺点:

  • 逻辑较为简单,只适合用于实现简单的函数

这种方式编码少,代码逻辑清晰,可以快速实现简单的UDF

第一种方式的代码实现最为简单,只需新建一个类继承UDF,然后编写evaluate()即可。

import org.apache.hadoop.hive.ql.exec.UDF;

/**
* 继承org.apache.hadoop.hive.ql.exec.UDF
*/
public class SimpleUDF extends UDF {

/**
* 编写一个函数,要求如下:
* 1.函数名必须为evaluate
* 2.参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map
* 3.函数一定要有返回值,不能为void
*/
public int evaluate(int a, int b) {
return a + b;
}

/**
* 支持函数重载
*/
public Integer evaluate(Integer a, Integer b, Integer c) {
if(a == null || b == null || c == null)
return 0;

return a + b + c;
}
}

继承UDF类的方式非常简单,但还有一些需要注意的地方:

  • evaluate()方法并不是继承自UDF类
  • evaluate()的返回值类型不能为void

支持hive基本类型、数组和MapHive基本类型

Java可以使用Java原始类型、Java包装类或对应的Writable类对于基本类型,最好不要使用Java原始类型,当null传给Java原始类型参数时,UDF会报错。Java包装类还可以用于null值判断

Hive类型 Java原始类型 Java包装类 hadoop.io.Writable
tinyint byte Byte ByteWritable
smallint short Short ShortWritable
int int Integer IntWritable
bigint long Long LongWritable
string String Text
boolean boolean Boolean BooleanWritable
float float Float FloatWritable
double double Double DoubleWritable

数组和Map

Hive类型 Java类型
array List
Map<K,V> Map<K,V>

第二种:继承GenericUDF类

优点:

  • 支持任意长度、任意类型的参数
  • 可以根据参数个数和类型实现不同的逻辑
  • 可以实现初始化和关闭资源的逻辑(initialize、close)

缺点:

  • 实现比继承UDF要复杂一些

与继承UDF相比,GenericUDF更加灵活,可以实现更为复杂的函数

继承GenericUDF后,必须实现其三个方法:

  • initialize()
  • evaluate()
  • getDisplayString()

initialize()

/**
 * 初始化 GenericUDF,每个 GenericUDF 示例只会调用一次初始化方法
 *
 * @param arguments
 * 自定义 UDF 参数的 ObjectInspector 实例
 * @throws UDFArgumentException
 * 参数个数或类型错误时,抛出该异常
 * @return 函数返回值类型
 */
public abstract ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException;

initialize() 在函数在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:

  • 判断函数参数个数
  • 判断函数参数类型
  • 确定函数返回值类型

除此之外,用户在这里还可以做一些自定义的初始化操作,比如初始化 HDFS 客户端等1. 判断函数参数个数

可通过 arguments 数组的长度来判断函数参数的个数

判断函数参数个数示例:

// 1. 参数个数检查,只有一个参数
if (arguments.length != 1) // 函数只接受一个参数
throw new UDFArgumentException("函数需要一个参数"); // 当自定义 UDF 参数与预期不符时,抛出异常

2. 判断函数参数类型

ObjectInspector 可用于侦测参数数据类型,其内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型

public interface ObjectInspector extends Cloneable {
public static enum Category {
PRIMITIVE, // Hive 原始类型
LIST, // Hive 数组
MAP, // Hive Map
STRUCT, // 结构体
UNION // 联合体
};
}

Hive 原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的 Hive 原始类型

public interface PrimitiveObjectInspector extends ObjectInspector {

/**
 * The primitive types supported by Hive.
 */
public static enum PrimitiveCategory {
VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
UNKNOWN
};
}

参数类型判断示例:

// 2. 参数类型检查,参数类型为 String
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是 Hive 原始类型
|| !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory())) // 参数是 Hive 的 string 类型
throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义 UDF 参数与预期不符时,抛出异常

3. 确定函数返回值类型

initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义 UDF 返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型

ObjectInspector 的源码中,有这么一段注释,其大意是 ObjectInspector 的实例应该由对应的工厂类获取,以保证实例的单例等属性

/**
 * An efficient implementation of ObjectInspector should rely on factory, so
 * that we can make sure the same ObjectInspector only has one instance. That
 * also makes sure hashCode() and equals() method of java.lang.Object directly
 * works for ObjectInspector as well.
 */
public interface ObjectInspector extends Cloneable {}

对于基本类型(byte、short、int、long、float、double、boolean、string),可以通过 PrimitiveObjectInspectorFactory 的静态字段直接获取

Hive 类型 writable 类型 Java 包装类型
tinyint writableByteObjectInspector javaByteObjectInspector
smallint writableShortObjectInspector javaShortObjectInspector
int writableIntObjectInspector javaIntObjectInspector
bigint writableLongObjectInspector javaLongObjectInspector
string writableStringObjectInspector javaStringObjectInspector
boolean writableBooleanObjectInspector javaBooleanObjectInspector
float writableFloatObjectInspector javaFloatObjectInspector
double writableDoubleObjectInspector javaDoubleObjectInspector

注意:基本类型返回值有两种:Writable 类型和 Java 包装类型:

  • 在 initialize 指定的返回值类型为 Writable 类型时,在 evaluate() 中 return 的就应该是对应的 Writable 实例
  • 在 initialize 指定的返回值类型为 Java 包装类型时,在 evaluate() 中 return 的就应该是对应的 Java 包装类实例

Array、Map<K,V> 等复杂类型,则可以通过 ObjectInspectorFactory 的静态方法获取

getStandardMapObjectInspector(K k, V v);

Hive 类型 ObjectInspectorFactory 的静态方法 evaluate() 返回值类型
Array getStandardListObjectInspector(T t) List
Map<K,V> Map<K,V>

返回值类型为Map<String,int>的示例:

// 3. 自定义UDF返回值类型为Map<String,int>
return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key是String
PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value是int
);

完整的initialize()函数如下:

/**
* 初始化GenericUDF,每个GenericUDF示例只会调用一次初始化方法
*
* @param arguments
* 自定义UDF参数的ObjectInspector实例
* @throws UDFArgumentException
* 参数个数或类型错误时,抛出该异常
* @return 函数返回值类型
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1. 参数个数检查,只有一个参数
if (arguments.length != 1) // 函数只接受一个参数
throw new UDFArgumentException("函数需要一个参数"); // 当自定义UDF参数与预期不符时,抛出异常

// 2. 参数类型检查,参数类型为String
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是Hive原始类型
|| !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory())) // 参数是Hive的string类型
throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义UDF参数与预期不符时,抛出异常

// 3. 自定义UDF返回值类型为Map<String,int>
return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key是String
PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value是int
);
}

evaluate()

核心方法,自定义UDF的实现逻辑

代码实现步骤可以分为三部分:

  • 参数接收
  • 自定义UDF核心逻辑
  • 返回处理结果

1. 参数接收

evaluate()的参数就是自定义UDF的参数

/**
* Evaluate the GenericUDF with the arguments.
*
* @param arguments
* The arguments as DeferedObject, use DeferedObject.get() to get the
* actual argument Object. The Objects can be inspected by the
* ObjectInspectors passed in the initialize call.
* @return The
*/
public abstract Object evaluate(DeferredObject[] arguments)
throws HiveException;

通过源码注释可知,DeferedObject.get()可以获取参数的值:

/**
* A DeferedObject allows us to do lazy-evaluation and short-circuiting.
* GenericUDF use DeferedObject to pass arguments.
*/
public static interface DeferredObject {
void prepare(int version) throws HiveException;
Object get() throws HiveException;
};

再看看DeferredObject的源码,DeferedObject.get()返回的是Object,传入的参数不同,会是不同的Java类型,以下是Hive常用参数类型对应的Java类型

对于Hive基本类型,传入的都是Writable类型

Hive类型 Java类型
tinyint ByteWritable
smallint ShortWritable
int IntWritable
bigint LongWritable
string Text
boolean BooleanWritable
float FloatWritable
double DoubleWritable
Array ArrayList
Map<K,V> HashMap<K,V>

参数接收示例:

// 只有一个参数:Map<String,int>

// 1. 参数为null时的特殊处理
if (arguments[0] == null)
return ...

// 2. 接收参数
Map<Text, IntWritable> map = (Map<Text, IntWritable>) arguments[0].get();

2. 自定义UDF核心逻辑

获取参数之后,到这里就是自由发挥了~

3. 返回处理结果

这一步和initialize()的返回值一一对应

基本类型返回值有两种:Writable类型和Java包装类型:

  • 在initialize指定的返回值类型为Writable类型时,在evaluate()中return的就应该是对应的Writable实例
  • 在initialize指定的返回值类型为Java包装类型时,在evaluate()中return的就应该是对应的Java包装类实例

Hive数组和Map的返回值类型如下:

Hive类型 Java类型
Array<T> List<T>
Map<K,V> Map<K,V>

getDisplayString()getDisplayString()返回的是 explain 时展示的信息

/**
 * Get the String to be displayed in explain.
 */
public abstract String getDisplayString(String[] children);

注意:这里不能 return null,否则可能在运行时抛出空指针异常,而且这个出现这个问题还不容易排查~

ERROR [b1c82c24-bfea-4580-9a0c-ff47d7ef4dbe main] ql.Driver: FAILED: NullPointerException null
java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    ...
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

close()

资源关闭回调函数

不是抽象方法,可以不实现

/**
 * Close GenericUDF.
 * This is only called in runtime of MapRedTask.
 */
@Override
public void close() throws IOException {}

自定义 GenericUDF 完整示例

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class SimpleGenericUDF extends GenericUDF {
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 1.参数个数检查
        if (arguments.length != 1) // 函数只接受一个参数
            throw new UDFArgumentException("函数需要一个参数"); // 当自定义 UDF 参数与预期不符时,抛出异常

        // 2.参数类型检查
        if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE // 参数是 Hive 原始类型
                || !PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory())) // 参数是 Hive 的 string 类型
            throw new UDFArgumentException("函数第一个参数为字符串"); // 当自定义 UDF 参数与预期不符时,抛出异常

        // 3.自定义 UDF 返回值类型为 Map<String, int>
        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector, // Key 是 String
                PrimitiveObjectInspectorFactory.javaIntObjectInspector // Value 是 int
        );
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        // 1.参数接收
        if (arguments[0] == null)
            return new HashMap<>();
        String str = ((Text) arguments[0].get()).toString();

        // 2.自定义 UDF 核心逻辑
        // 统计字符串中每个字符的出现次数,并将其记录在 Map 中
        Map<String, Integer> map = new HashMap<>();
        for (char ch : str.toCharArray()) {
            String key = String.valueOf(ch);
            Integer count = map.getOrDefault(key, 0);
            map.put(key, count + 1);
        }

        // 3.返回处理结果
        return map;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "这是一个简单的测试自定义 UDF~";
    }
}

UDAF(user-defined aggregate function)

实现 UDAF 需要实现两个类

  • apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2:UDAF 入口类,负责参数校验,决定 UDAF 核心逻辑实现类。
  • apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator:UDAF 核心逻辑实现类,负责数据聚合。

为了更加直观,本篇文章将以实现计算平均数的案例来讲解

GenericUDAFResolver2

GenericUDAFResolver2 是 UDAF 的入口类,负责参数检验。实现 GenericUDAFResolver2 接口,并实现其方法即可。

public class Avg implements GenericUDAFResolver2 {

/**
 * UDAF入口函数
 * 负责:
 * 1. 参数校验
 * 2. 返回 UDAF 核心逻辑实现类
 */
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    ObjectInspector[] parameters = info.getParameterObjectInspectors();

    // 1. 参数个数校验
    if (parameters.length != 1)
        throw new UDFArgumentException("只接受一个参数");

    // 2. 参数类型校验
    else if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE ||
        ((PrimitiveObjectInspector)parameters[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT)
        throw new UDFArgumentException("第一个参数是int");

    // 3. 可以获取参数的其他信息
    if (info.isAllColumns()) // 函数参数是否为*
        System.out.println("FUNCTION(*)");
    if (info.isDistinct()) // 函数参数是否被DISTINCT修饰
        System.out.println("FUNCTION(DISTINCT xxx)");
    if (info.isWindowing()) // 是否是窗口函数
        System.out.println("FUNCTION() OVER(xxx)");

    // 3. UDAF核心逻辑实现类
    return new AvgEvaluator();
}

/**
 * 该方法是用于兼容老的UDAF接口,不用实现
 * 如果通过AbstractGenericUDAFResolver实现Resolver,则该方法作为UDAF的入口
 */
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
    throw new UDFArgumentException("方法未实现");
}
}

GenericUDAFEvaluator

GenericUDAFEvaluator是UDAF的核心逻辑实现,需要实现的方法较多,而且不同的模式下会调用不同的方法

在实现GenericUDAFEvaluator之前,首先需要理解它的四个模式Mode

GenericUDAFEvaluator内部有一个Mode枚举类,并且有一个对应的成员变量

Mode对应了MapReduce中的一些阶段,其详细信息请见下方代码

/**
 * UDAF入口函数类
 */
public abstract class GenericUDAFEvaluator implements Closeable {

    /**
     * Mode.
     *
     */
    public static enum Mode {
        /**
         * 读取原始数据,聚合部分数据,获得部分聚合结果
         * 调用:iterate()、terminatePartial()
         * 对应Map阶段(不包括Combiner)
         */
        PARTIAL1,
        /**
         * 读取部分聚合结果,再做部分聚合,获得新的部分聚合结果
         * 调用:merge()、terminatePartial()
         * 对应Map的Combiner阶段
         */
        PARTIAL2,
        /**
         * 读取部分聚合结果,进行全局聚合,获得全局聚合结果
         * 调用:merge()、terminate()
         * 对应Reduce阶段
         */
        FINAL,
        /**
         * 读取原始数据,直接进行全局聚合,获得全局聚合结果 and
         * 调用:iterate()、terminate()
         * 对应MapOnly任务,只有Map阶段
         */
        COMPLETE
    };

    Mode mode;
}

各个Mode调用的方法如下:

AggregationBuffer

聚合过程中,用于保存中间结果的Buffer

核心函数

函数 描述
getNewAggregationBuffer() 获取一个新的Buffer,用于保存中间计算结果
reset(agg) 重置Buffer,在Hive程序执行时,可能会复用Buffer实例
init(m, parameters) 各个模式下,都会调用该方法进行初始化。校验上一阶段的参数,并且决定该阶段的输出
iterate(agg, parameters) 读取原始数据,计算部分聚合结果
terminatePartial(agg) 输出部分聚合结果
merge(agg, partial) 合并部分聚合结果
terminate(agg) 输出全局聚合结果

核心函数的调用过程如下:

实现代码:

/**
 * UDAF核心逻辑类
 */
public class AvgEvaluator extends GenericUDAFEvaluator {

/**
 * 聚合过程中,用于保存中间结果的Buffer
 * 继承AbstractAggregationBuffer
 * <p>
 * 对于计算平均数,我们首先要计算总和(sum)和总数(count)
 * 最后用总和/总数就可以得到平均数
 */
private static class AvgBuffer extends AbstractAggregationBuffer {
    // 总和
    private Integer sum = 0;
    // 总数
    private Integer count = 0;
}

/**
 * 初始化
 *
 * @param m 聚合模式
 * @param parameters 上一个阶段传过来的参数,可以在这里校验参数:
 * 在PARTIAL1和COMPLETE模式,代表原始数据
 * 在PARTIAL2和FINAL模式,代表部分聚合结果
 * @return 该阶段最终的返回值类型
 * 在PARTIAL1和PARTIAL2模式,代表terminatePartial()的返回值类型
 * 在FINAL和COMPLETE模式,代表terminate()的返回值类型
 */
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
    super.init(m, parameters);
    if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {
        // 在PARTIAL1和PARTIAL2模式,代表terminatePartial()的返回值类型
        // terminatePartial()返回的是部分聚合结果,这时候需要传递sum和count,所以返回类型是结构体
        List<ObjectInspector> structFieldObjectInspectors = new LinkedList<ObjectInspector>();
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(
            Arrays.asList("sum", "count"),
            structFieldObjectInspectors
        );
    } else {
        // 在FINAL和COMPLETE模式,代表terminate()的返回值类型
        // 该函数最终返回一个double类型的数据,所以这里的返回类型是double
        return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
    }
}

/**
 * 获取一个新的Buffer,用于保存中间计算结果
 */
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    // 直接实例化一个AvgBuffer
    return new AvgBuffer();
}

/**
 * 重置Buffer,在Hive程序执行时,可能会复用Buffer实例
 *
 * @param agg 被重置的Buffer
 */
public void reset(AggregationBuffer agg) throws HiveException {
    // 重置AvgBuffer实例的状态
    ((AvgBuffer)agg).sum = 0;
    ((AvgBuffer)agg).count = 0;
}

/**
 * 读取原始数据,计算部分聚合结果
 *
 * @param agg 用于保存中间结果
 * @param parameters 原始数据
 */
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    if (parameters == null || parameters[0] == null)
        return;

    if (parameters[0] instanceof IntWritable) {
        // 计算总和
        ((AvgBuffer)agg).sum += ((IntWritable)parameters[0]).get();
        // 计算总数
        ((AvgBuffer)agg).count += 1;
    }
}

/**
 * 输出部分聚合结果
 *
 * @param agg 保存的中间结果
 * @return 部分聚合结果,不一定是一个简单的值,可能是一个复杂的结构体
 */
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
    // 传递中间结果时,必须传递总和、总数
    // 这里需要返回一个数组,表示结构体
    return new Object[]{
        new IntWritable(((AvgBuffer)agg).sum),
        new IntWritable(((AvgBuffer)agg).count)
    };
}

/**
 * 合并部分聚合结果
 * 输入:部分聚合结果
 * 输出:部分聚合结果
 *
 * @param agg 当前聚合中间结果类
 * @param partial 其他部分聚合结果值
 */
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    if (partial != null) {
        // 传递过来的结构体为LazyBinaryStruct类型,需要从中提取数据
        ((AvgBuffer)agg).sum += ((IntWritable)((LazyBinaryStruct)partial).getField(0)).get();
        ((AvgBuffer)agg).count += ((IntWritable)((LazyBinaryStruct)partial).getField(1)).get();
    }
}

/**
 * 输出全局聚合结果
 *
 * @param agg 保存的中间结果
 */
public Object terminate(AggregationBuffer agg) throws HiveException {
    // 总和/总数
    return new DoubleWritable(1.0 * ((AvgBuffer)agg).sum / ((AvgBuffer)agg).count);
}
}

UDTF(user-defined table function)

实现自定义UDTF需要继承GenericUDTF,并且实现其三个方法:

  • initialize()
  • process()
  • close()

其中process()、close()为GenericUDTF中的抽象方法,必须实现。initialize()虽然不是抽象方法,但必须手动覆盖实现该方法,因为GenericUDTF的initialize()最终会抛出一个异常:

throw new IllegalStateException("Should not be called directly");

initialize()

需要覆盖实现的方法如下:

public StructObjectInspector initialize(StructObjectInspector argOIs)
throws UDFArgumentException {}

initialize()在函数在GenericUDTF初始化时被调用一次,执行一些初始化操作,包括:

  • 判断函数参数个数
  • 判断函数参数类型
  • 确定函数返回值类型

除此之外,用户在这里还可以做一些自定义的初始化操作,比如初始化HDFS客户端等1.判断函数参数个数

initialize()的参数为StructObjectInspector argOIs

可以通过如下方式获取自定义UDTF的所有参数

List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();

判断参数个数的方式很简单,只要判断inputFieldRef的元素个数即可。

示例:

List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();
//参数个数为1
if(inputFieldRef.size() != 1)
throw new UDFArgumentException("需要一个参数");

2.判断函数参数类型

inputFieldRef的元素类型是StructField,可以通过StructField获取参数类型ObjectInspector

判断参数个数和类型的示例:

//1.判断参数个数,只有一个参数
List<? extends StructField> inputFieldRef = argOIs.getAllStructFieldRefs();
if(inputFieldRef.size() != 1)
throw new UDFArgumentException("需要一个参数");

//2.判断参数类型,参数类型为string
ObjectInspector objectInspector = inputFieldRef.get(0).getFieldObjectInspector();
if(objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE //参数是Hive原始类型
||!PrimitiveObjectInspector.PrimitiveCategory.STRING.equals(((PrimitiveObjectInspector)objectInspector).getPrimitiveCategory()))//参数是Hive的string类型
throw new UDFArgumentException("函数第一个参数为字符串");//当自定义UDF参数与预期不符时,抛出异常

3.确定函数返回值类型

UDTF函数可以对于一行输入,可以产生多行输出,并且每行结果可以有多列。自定义UDTF的返回值类型会稍微复杂些,需要明确输出结果的所有列名和列类型

initialize()方法的返回值类型为StructObjectInspector

StructObjectInspector表示了一行记录的结构,可以包括多个列。每个列有列名、列类型和列注释(可选)

可以通过ObjectInspectorFactory来获取StructObjectInspector实例:

/**
*structFieldNames:列
*/
ObjectInspectorFactory.getStandardStructObjectInspector(
List<String> structFieldNames,
List<ObjectInspector> structFieldObjectInspectors)
structFieldNames的第n个元素,代表了第n列的名称;structFieldObjectInspectors的第n个元素,代表了第n列的类型。
structFieldNames和structFieldObjectInspectors应该保持长度一致
//只有一列,列的类型为Map<String,int>
return ObjectInspectorFactory.getStandardStructObjectInspector(
Collections.singletonList("result_column_name"),
Collections.singletonList(
ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,//Key是String
PrimitiveObjectInspectorFactory.javaIntObjectInspector//Value是int
)
)
);

process()

核心方法,自定义UDTF的实现逻辑

代码实现步骤可以分为三部分:

  • 参数接收
  • 自定义UDTF核心逻辑
  • 输出结果
/**
*Give a set of arguments for the UDTF to process.
*
*@param args
*object array of arguments
*/
public abstract void process(Object[] args) throws HiveException;

1.参数接收

args即是自定义UDTF的参数,传入的参数不同,会是不同的Java类型,以下是Hive常用参数类型对应的Java类型

Hive类型 Java类型
tinyint ByteWritable
smallint ShortWritable
int IntWritable
bigint LongWritable
string Text
boolean BooleanWritable
float FloatWritable
double DoubleWritable
Array ArrayList
Map<K,V> HashMap<K,V>

参数接收示例:

//参数null值的特殊处理
if(args[0] == null)
return;

//接收参数
String str = ((Text)args[0]).toString();

2.自定义UDTF核心逻辑

获取参数之后,到这里就是自由发挥了~

3.输出结果

process()方法本身没有返回值,通过GenericUDTF中的forward()输出一行结果。forward()可以反复调用,可以输出任意行结果

/**
*Passes an output row to the collector.
*
*@param o
*@throws HiveException
*/
protected final void forward(Object o) throws HiveException {
collector.collect(o);
}
forward()可以接收List或Java数组,第n个元素代表第n列的值
List<Object> list = new LinkedList<>();
//第一列是int
list.add(1);
//第二列是string
list.add("hello");
//第三列是boolean
list.add(true);

//输出一行结果
forward(list);

close()

没有其他输入行时,调用该函数

可以进行一些资源关闭处理等最终处理

UDF相关语法

UDF使用需要将编写的UDF类编译为jar包添加到Hive中,根据需要创建临时函数或永久函数。

resources操作

Hive支持向会话中添加资源,支持文件、jar、存档,添加后即可在sql中直接引用,仅当前会话有效,默认读取本地路径,支持hdfs等,路径不加引号。例:add jar /opt/ht/AddUDF.jar;

#添加资源
ADD {FILE[S]|JAR[S]|ARCHIVE[S]} <filepath1> [<filepath2>]*
#查看资源
LIST {FILE[S]|JAR[S]|ARCHIVE[S]} [<filepath1> <filepath2>..]
#删除资源
DELETE {FILE[S]|JAR[S]|ARCHIVE[S]} [<filepath1> <filepath2>..]

临时函数

仅当前会话有效,不支持指定数据库,USING路径需加引号。

CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
DROP TEMPORARY FUNCTION [IF EXISTS] function_name;

永久函数

函数信息入库,永久有效,USING路径需加引号。临时函数与永久函数均可使用USING语句,Hive会自动添加指定文件到当前环境中,效果与add语句相同,执行后即可list查看已添加的文件或jar包。

CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
DROP FUNCTION [IF EXISTS] function_name;
RELOAD (FUNCTIONS|FUNCTION);

查看函数

#查看所有函数,不区分临时函数与永久函数
show functions;
#函数模糊查询,此处为查询x开头的函数
show functions like 'x*';
#查看函数描述
desc function function_name;
#查看函数详细描述
desc function extended function_name;

escription注解

Hive已定义注解类型org.apache.hadoop.hive.ql.exec.Description,用于执行descfunction[extended]function_name时介绍函数功能,内置函数与自定义函数用法相同。

若Description注解名称与创建UDF时指定名称不同,以创建UDF时指定名称为准。

public @interface Description {
//函数简单介绍
String value() default "_FUNC_is undocumented";
//函数详细使用说明
String extended() default "";
//函数名称
String name() default "";
}

Hive UDF实战—经纬度编码

在Idea中,新建一个Maven项目,命名为: GeoUtilsUdf。

pom.xml文件的修改

修改调整后的pom.xml:



4.0.0

com.ly
GeoUtilsUdf
1.0-SNAPSHOT



org.apache.hive
hive-exec
1.1.0
provided


org.apache.hadoop
hadoop-common
2.6.0


jdk.tools
jdk.tools





com.github.davidmoten
geo
0.8.0



com.uber
h3
4.1.1






org.apache.maven.plugins
maven-assembly-plugin


jar-with-dependencies




make-jar-with-dependencies
package

single





org.apache.maven.plugins
maven-compiler-plugin

8
8





pom.xml节点说明:

1.引入hive及hadoop依赖


org.apache.hive
hive-exec
1.1.0
provided


org.apache.hadoop
hadoop-common
2.6.0


jdk.tools
jdk.tools



注意,版本必须与线上的Hive和Hadoop版本一致。另外hive-exec的scope设为provided。因为Hive的lib目录下已经包括了hive-exec的jar,Hive会自动将其加载。所以这里就不需要再打包进入jar。

2.添加外部依赖,不重复造轮子。

使用外部已有的java包,一个是Geohash,另外一个是uberh3。


com.github.davidmoten
geo
0.8.0


com.uber
h3
4.1.1

3.打包设置,将外部依赖打包进入jar。

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-jar-with-dependencies</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

编写Java代码

在src/main/java目录下新建两个Package,并在分别为:

  • hive.udf.geohash
  • hive.udf.h3

在com.hive.udf.geohash下先建2个JavaClass:

  • UDFGeohashEncode(将经纬度转化为geohash)
  • UDFGeohashDecode(将geohash转化为经纬度)

代码如下:

#	UDFGeohashEncode.java
package com.hive.udf.geohash;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import com.github.davidmoten.geo.GeoHash;

public class UDFGeohashEncode extends UDF {

public Text evaluate(DoubleWritable longitude, DoubleWritable latitude, IntWritable precision){

if(latitude == null || longitude == null || precision == null){
return null;
}

return new Text(GeoHash.encodeHash(latitude.get(), longitude.get(), precision.get()));

}

}
#	UDFGeohashDecode.java
package com.hive.udf.geohash;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

import com.github.davidmoten.geo.GeoHash;
import com.github.davidmoten.geo.LatLong;

public class UDFGeohashDecode extends UDF {

public Text evaluate(Text geohash){

if(geohash == null){
return null;
}

LatLong location = GeoHash.decodeHash(geohash.toString());

return new Text(location.getLon() + "," + location.getLat());
}

}

在com.hive.udf.h3下先建3个JavaClass:

  • UDFH3Encode(将经纬度转化为H3)
  • UDFH3ToGeo(将H3转化为经纬度)
  • UDFH3ToGeoBoundary(将H3转化为6个顶点的经纬度)

代码示例:

#UDFH3Encode.java
package com.hive.udf.h3;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.uber.h3core.H3Core;

import java.io.IOException;


public class UDFH3Encode extends UDF {

public Text evaluate(DoubleWritable longitude, DoubleWritable latitude, IntWritable precision){
H3Core h3 = null;
try{
h3 = H3Core.newInstance();
}catch(IOException e){
e.printStackTrace();
}
if(latitude == null || longitude == null || precision == null){
return null;
}
return new Text(h3.latLngToCellAddress(latitude.get(), longitude.get(), precision.get()));
}

}

#UDFH3ToGeo.java
package com.hive.udf.h3;

import com.uber.h3core.H3Core;
import com.uber.h3core.util.LatLng;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;

public class UDFH3ToGeo extends GenericUDF {
private ObjectInspectorConverters.Converter[] converters;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if (arguments.length != 1) {
throw new UDFArgumentLengthException(
“The function UDFH3ToGeo(s) takes exactly 1 arguments.”);
}
converters = new ObjectInspectorConverters.Converter[arguments.length];
converters[0] = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.writableStringObjectInspector);

return ObjectInspectorFactory.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector);
}

@Override
public HashMap evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == 1);
H3Core h3 = null;
Text hexAddr = (Text) converters[0].convert(arguments[0].get());
if (hexAddr == null) {
return null;
}
try {
h3 = H3Core.newInstance();
} catch (IOException e) {
e.printStackTrace();
}

LatLng coords = h3.cellToLatLng(String.valueOf(hexAddr));
HashMap map_double = new HashMap();
try {
map_double.put(“lng”, coords.lng);
map_double.put(“lat”, coords.lat);
} catch (Exception ex) {
ex.printStackTrace();
}
return map_double;
}

@Override
public String getDisplayString(String[] strings) {
return null;
}
}

#UDFH3ToGeoBoundary.java
package com.hive.udf.h3;

import com.uber.h3core.H3Core;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class UDFH3ToGeoBoundary extends GenericUDF {

private ObjectInspectorConverters.Converter[] converters;
private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
private transient ArrayList<Object> ret = new ArrayList<Object>();


public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);

if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"The function UDFH3ToGeoBoundary(s) takes exactly 1 arguments.");
}

converters = new ObjectInspectorConverters.Converter[arguments.length];
converters[0] = ObjectInspectorConverters.getConverter(arguments[0],
PrimitiveObjectInspectorFactory.javaStringObjectInspector);


ObjectInspector returnOI =
returnOIResolver.get(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);
}


@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == 1);
H3Core h3 = null;
String hexAddr = (String) converters[0].convert(arguments[0].get());
if (hexAddr == null) {
return null;
}
try {
h3 = H3Core.newInstance();
} catch (IOException e) {
e.printStackTrace();
}

List<com.uber.h3core.util.LatLng> geoCoords = h3.cellToBoundary(hexAddr);
ret.clear();
for (com.uber.h3core.util.LatLng str : geoCoords) {
StringBuilder sb = new StringBuilder();
sb.append(str.lng);
sb.append(",");
sb.append(str.lat);
ret.add(sb);
}
return ret;
}

public String getDisplayString(String[] strings) {
return null;
}

}

编译生成Jar包

命令行执行:mvn clean install

执行完成后会在 target 目录下生成 GeoUtilsUdf-1.0-SNAPSHOT-jar-with-dependencies.jar 文件,此文件就是最终我们要使用的 jar 包。

完成后上传到线上的目录。使用示例:

CREATE TEMPORARY FUNCTION geotoh3address AS 'com.hive.udf.h3.UDFH3Encode'
USING JAR 'viewfs://dcfs/ns-common/car/dev/udf_jar/GeoUtilsUdf-1.0-SNAPSHOT-jar-with-dependencies.jar';

SELECT geotoh3address(CAST(lng AS DOUBLE), CAST(lat AS DOUBLE), 7) AS h3hex
FROM testdb.test_table
LIMIT 10

不重复造轮子,以下一些开源的 Hive UDF 可以复制编译后使用:

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注