推广

Hive内置函数总结

iseeyu2年前 (2024-02-21)推广120

通过自定义 InputFormat 解决特殊分隔符

其原理是在 InputFormat 读取行的时候将数据中的“多字节分隔符”替换为 hive 默认的分隔符(ctrl+A亦即\x01)或用于替代的单字符分隔符,以便 hive 在 serde 操作时按照默认的单字节分隔符进行字段抽取
com.naixue.hive.delimit2.BiDelimiterInputFormat 代码如下:

package com.naixue.hive.delimit2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class BiDelimiterInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit
genericSplit, JobConf job, Reporter reporter)throws IOException {
reporter.setStatus(genericSplit.toString());
BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit);
// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);
return reader;
}
}

com.naixue.hive.delimit2.BiRecordReader 代码如下:

package com.naixue.hive.delimit2;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class BiRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG =
LogFactory.getLog(LineRecordReader.class.getName());
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
int maxLineLength;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
/**
* A class that provides a line reader from an input stream.
* @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.
*/
@Deprecated
public static class LineReader extends org.apache.hadoop.util.LineReader {
LineReader(InputStream in) {
super(in);
}
LineReader(InputStream in, int bufferSize) {
super(in, bufferSize);
}
public LineReader(InputStream in, Configuration conf)
throws IOException {
super(in, conf);
}
}
public BiRecordReader(Configuration job, FileSplit split) throws IOException
{
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec) codec)
.createInputStream(fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new LineReader(codec.createInputStream(fileIn,
decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public BiRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.in = new LineReader(in);
this.start = offset;
this.pos = offset;
this.end = endOffset;
this.filePosition = null;
}
public BiRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.in = new LineReader(in, job);
this.start = offset;
this.pos = offset;
this.end = endOffset;
this.filePosition = null;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
key.set(pos);
// 重点代码处
int newSize = in.readLine(value,
maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));
String str = value.toString().replaceAll("\\|\\|", "\\|");
value.set(str);
pos += newSize;
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos "
+ (pos - newSize));
}
return false;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start)
/ (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
}

注意:
1、上述代码中的 API 全部使用 Hadoop 的老 API 接口 org.apache.hadoop.mapred…。然后将工程打包,并拷贝至hive安装目录的lib文件夹中,并重启hive,使用以下语句建表

hive> create table new_bi(id string,name string) row format delimited fields
terminated by '|' stored as inputformat
'com.naixue.hive.delimit2.BiDelimiterInputFormat' outputformat
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
hive> load data local inpath '/home/bigdata/bi.dat' into table new_bi;
hive> select * from new_bi;
OK
01 huangbo
02 xuzheng
03 wangbaoqiang

2、还需要在 Hive 中使用 add jar,才能在执行 HQL 查询该表时把自定义 jar 包传递给 mapTask

hive> add jar /home/bigdata/apps/hive/lib/myinput.jar;

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://www.0291.com.cn/post/56983.html

相关文章

大促会员活动策划逻辑拆解!

大促会员活动策划逻辑拆解!

  一、案例分享 小明最近天天去一家零食店签到,看到有活动就买买买,结果长胖了好几斤,不是小明嘴馋,实在是这家店套路太深了: 1. 免费领 618大促,小明无聊刷刷淘宝,发现一家店首页提示可以免费领芒果干,有便宜占不放过,小明按照指示绑了会员卡,1分钱加购了芒果干,付款时...

一文理清百度搜索CPC/eCPC/oCPC

一文理清百度搜索CPC/eCPC/oCPC

  很多同学在推广的时候,经常分不清什么是CPC、eCPC、oCPC,这几个CPC该什么时候用、怎么用?今天我们来一起来了解下这三个计费方式。 1、概念 CPC:手动出价,点击计费,按照每次广告点击的价格计费,比如百度每次点击价格是0.5元,那百度推广的CP...

秋刀鱼工具箱选词工具有什么功能

秋刀鱼工具箱选词工具有什么功能

关键词查询该宝贝对应的关键词,展现指数,点击指数、点击率,转化率、市场均价(付费市场价格)、竞争度、类目相关、搜索趋势、推荐度。...

应该站在不同的角度看待我们的优化工作。

应该站在不同的角度看待我们的优化工作。

当下搜索引擎在不断的发生着改动,我们搜索引擎优化人员的优化作业也应该有所立异,作为一个与时俱进的人,我们要不断的去进步自己的搜索引擎优化技术,不断的为我们大脑进行换血,让思维不断的得到新老替换,优化的目的便是为了更好的效力客户,那么,我们就应该站在不同的角度看待我们的优化作业,让自己的优化战略一贯保...

淘宝选词助手是什么软件(选词助手免费使用的吗)

淘宝选词助手是什么软件(选词助手免费使用的吗)

选词助手就是常用的运营软件生意参谋里面的一项功能,可以给我们提供热搜关键词,通过此软件,我们可以选择最优的关键词做宝贝标题,从而提升宝贝的排名。...

拼多多需要多少保证金才能开店

拼多多需要多少保证金才能开店

随着电商行业的飞速发展,越来越多的创业者选择在拼多多这个平台上开店。对于新手来说,最关心的问题莫过于拼多多需要多少保证金才能开店。今天,我就来给大家详细解答一下这个问题,让你读完这篇文章就能明白拼多多开店的保证金究竟是多少。 我们要明白,拼多多的保证金分为两部分:平台保证金和技术服务...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片