读改写TiDB数据 利用TiKV优化数据处理实战分析

网友投稿 377 2024-03-11



一切开始的原因

由于数据开发的需要,我一度尝试将tidb 的使用范围更大话,同时目前大数据开发中,内存当做堆料,对于公司的开支也会与很大压力,那么就我就尝试将tikv 当做kafka 和redis 使用,本文章中将讲述开发的过程以及衍生品;

读改写TiDB数据 利用TiKV优化数据处理实战分析

row_id 是什么

也许你和我在尝试使用tikv的时候会感觉网上的资料好像都是不太对劲的样子,比如:

https://tikv.github.io/client-java/examples/txnkv.html

我们看看这块代码:

import java.util.Arrays; import java.util.List; import org.tikv.common.BytePairWrapper; import org.tikv.common.ByteWrapper; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb.KvPair; import org.tikv.shade.com.google.protobuf.ByteString; import org.tikv.txn.KVClient; import org.tikv.txn.TwoPhaseCommitter; public class App { public static void main(String[] args) throws Exception { TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379"); try (TiSession session = TiSession.create(conf)) { // two-phrase write long startTS = session.getTimestamp().getVersion(); try (TwoPhaseCommitter twoPhaseCommitter = new TwoPhaseCommitter(session, startTS)) { BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); byte[] primaryKey = "key1".getBytes("UTF-8"); byte[] key2 = "key2".getBytes("UTF-8"); // first phrase: prewrite twoPhaseCommitter.prewritePrimaryKey(backOffer,primaryKey, "val1".getBytes("UTF-8")); List<BytePairWrapper> pairs = Arrays .asList(new BytePairWrapper(key2, "val2".getBytes("UTF-8"))); twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 1000); // second phrase: commit long commitTS = session.getTimestamp().getVersion(); twoPhaseCommitter.commitPrimaryKey(backOffer, primaryKey, commitTS); List<ByteWrapper> keys = Arrays.asList(new ByteWrapper(key2)); twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 1000); } try (KVClient kvClient = session.createKVClient()) { long version = session.getTimestamp().getVersion(); ByteString key1 = ByteString.copyFromUtf8("key1"); ByteString key2 = ByteString.copyFromUtf8("key2"); // get value of a single key ByteString val = kvClient.get(key1, version); System.out.println(val); // get value of multiple keys BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); List<KvPair> kvPairs = kvClient.batchGet(backOffer,Arrays.asList(key1, key2), version); System.out.println(kvPairs); // get value of a range of keys kvPairs = kvClient.scan(key1, ByteString.copyFromUtf8("key3"), version); System.out.println(kvPairs); } } } }

首先我们打印一下key 长得样子

key = <ByteString@1b78595f size=19 contents="t\200\000\000\000\000\000\000\212_r\200\000\000\000\000\001N1">

看结果好像应该是有个是table_id,一个是row_id;

我们继续找资料:

官方文档中 region 切分 有提及:

Split Table Region

表中行数据的 key 由 table_id 和 row_id 编码组成,格式如下:

t[table_id]_r[row_id]

例如,当 table_id 是 22,row_id 是 11 时:

t22_r11

同一表中行数据的 table_id 是一样的,但 row_id 肯定不一样,所以可以根据 row_id 来切分 Region。

 TIDB_DECODE_KEY

TIDB_DECODE_KEY 函数用于将 TiDB 编码的键输入解码为包含 _tidb_rowid 和 table_id 的 JSON 结构。你可以在一些系统表和日志输出中找到 TiDB 的编码键。

语法图

语法图代码

TableStmtTIDB_DECODE_KEY(STR)示例

以下示例中,表 t1 有一个隐藏的 rowid,该 rowid 由 TiDB 生成。语句中使用了 TIDB_DECODE_KEY 函数。结果显示,隐藏的 rowid 被解码后并输出,这是典型的非聚簇主键结果。

SELECTSTART_KEY, TIDB_DECODE_KEY(START_KEY) FROM information_schema.tikv_region_status WHERE table_name=t1 AND REGION_ID=2\G *************************** 1. row *************************** START_KEY: 7480000000000000FF3B5F728000000000FF1DE3F10000000000FA TIDB_DECODE_KEY(START_KEY): {"_tidb_rowid":1958897,"table_id":"59"} 1 row in set (0.00 sec)

但是至此我们还是无法明白row_id 到底是怎么生成的,受什么控制:

终于我在一篇tidb 公众号文章(参考资料里)里面找到答案

文中这样写道:

至此我们已经聊完了如何将 Table 映射到 KV 上面,这里再举个简单的例子,便于大家理解,还是以上面的表结构为例。假设表中有 3 行数据: 1, "TiDB", "SQL Layer", 10 2, "TiKV", "KV Engine", 20 3, "PD", "Manager", 30 那么首先每行数据都会映射为一个 Key-Value pair,注意这个表有一个 Int 类型的 Primary Key,所以 RowID 的值即为这个 Primary Key 的值。假设这个表的 TableID10,其 Row 的数据为: t_r_10_1 --> ["TiDB", "SQL Layer", 10] t_r_10_2 --> ["TiKV", "KV Engine", 20] t_r_10_3 --> ["PD", "Manager", 30]

至此,我们可以得到一个结论,key包含两个信息,一个是table_id,另外一个是row_id,且当主键为INT 类型时,主键就是row_id;

able_id 可以通过如下获得;

TiTableInfo table = session.getCatalog().getTable("ods", "ods_consult_order_event"); long id = table.getId();

 写一个生成 key 的方法

我们再扒一扒tikv Client 源码:

protected static final byte[] TBL_PREFIX = new byte[]{116};// 这个其实就是t private static final byte[] REC_PREFIX_SEP = new byte[]{95, 114}; // 这个其实就是_r private static byte[] encode(long tableId, long handle) { CodecDataOutput cdo = new CodecDataOutput(); encodePrefix(cdo, tableId); IntegerCodec.writeLong(cdo, handle); return cdo.toBytes(); } private static void encodePrefix(CodecDataOutput cdo, long tableId) { cdo.write(TBL_PREFIX); IntegerCodec.writeLong(cdo, tableId); cdo.write(REC_PREFIX_SEP); } public static void writeLong(CodecDataOutput cdo,long lVal) { cdo.writeLong(flipSignBit(lVal)); } private static long flipSignBit(long v) { return v ^ -9223372036854775808L; } /** * Writes a <code>long</code> to the underlying output stream as eight * bytes, high byte first. In no exception is thrown, the counter * <code>written</code> is incremented by <code>8</code>. * * @param v a <code>long</code> to be written. * @exception IOException if an I/O error occurs. * @see java.io.FilterOutputStream#out */ public final void writeLong(long v) throws IOException { writeBuffer[0] = (byte)(v >>> 56); writeBuffer[1] = (byte)(v >>> 48); writeBuffer[2] = (byte)(v >>> 40); writeBuffer[3] = (byte)(v >>> 32); writeBuffer[4] = (byte)(v >>> 24); writeBuffer[5] = (byte)(v >>> 16); writeBuffer[6] = (byte)(v >>> 8); writeBuffer[7] = (byte)(v >>> 0); out.write(writeBuffer, 0, 8); incCount(8); }

通过以上我们可以写一个通过table_id,row_id 获取key 的方法:

package util; /** * @Maintainer 蜡笔老舅 * @Email * @CreateDate 2023/3/17 * @Version 1.0 * @Comment */ import java.io.Serializable; import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.codec.Codec.IntegerCodec; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiExpressionException; import org.tikv.common.key.Key; import org.tikv.common.key.TypedKey; import static org.tikv.common.key.RowKey.toRowKey; public class TidbRowKeyUtil extends Key implements Serializable { private static final byte[] REC_PREFIX_SEP = new byte[]{95, 114}; private final long tableId; private final long handle; private final boolean maxHandleFlag; private TidbRowKeyUtil(long tableId, long handle) { super(encode(tableId, handle)); this.tableId = tableId; this.handle = handle; this.maxHandleFlag = false; } public static byte[] encode(long tableId, long handle) { CodecDataOutput cdo = new CodecDataOutput(); encodePrefix(cdo, tableId); encodeRow(cdo,handle); return cdo.toBytes(); } private static void encodePrefix(CodecDataOutput cdo, long tableId) { // 增加 /t table_id 标识 cdo.write(TBL_PREFIX); cdo.writeLong(tableId ^ Long.MIN_VALUE); // 写入一个 /r row_id 标识 cdo.write(REC_PREFIX_SEP); } private static void encodeRow(CodecDataOutput cdo, long tableId) { // 对row_id 进行编码 cdo.writeLong(tableId ^ Long.MIN_VALUE); } }

然后我们验证:

package tidb; import com.alibaba.fastjson.JSONObject; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.key.RowKey; import org.tikv.common.meta.TiColumnInfo; import org.tikv.common.meta.TiTableInfo; import org.tikv.kvproto.Kvrpcpb; import org.tikv.shade.com.google.protobuf.ByteString; import org.tikv.txn.KVClient; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; import static org.tikv.common.codec.TableCodec.decodeObjects; /** * @Maintainer 蜡笔老舅 * @Email * @CreateDate 2023/3/17 * @Version 1.0 * @Comment */ public class KVClientMatchTest implements Runnable{ public static void main(String[] args) { //3.创建实现类的对象 KVClientMatchTest mThread = new KVClientMatchTest(); //4.将此对象作为参数传递到Thread类的构造器中,创建Thread类的对象 Thread t1 = new Thread(mThread); t1.setName("线程1"); t1.start(); // // Thread t2 = new Thread(mThread); // t2.setName("线程2"); // t2.start(); // // Thread t3 = new Thread(mThread); // t3.setName("线程3"); // t3.start(); // // Thread t4 = new Thread(mThread); // t4.setName("线程4"); // t4.start(); // // Thread t5 = new Thread(mThread); // t5.setName("线程5"); // t5.start(); } private static void getKey(TiSession session, KVClient kvClient, Integer i) { long version = session.getTimestamp().getVersion(); System.out.println("version = " + version); System.out.println("System.currentTimeMillis() = " + System.currentTimeMillis()); TiTableInfo table = session.getCatalog().getTable("库名", "表名"); byte[] encode = TidbRowKeyUtil.encode(table.getId(), i); ByteString key = ByteString.copyFrom(encode); ByteString bytes = kvClient.get(key, version); System.out.println("bytes.size() = " + bytes.size()); if (bytes.size()!=0) { Kvrpcpb.KvPair.Builder builder = Kvrpcpb.KvPair.newBuilder();builder.setKey(key); builder.setValue(bytes); Kvrpcpb.KvPair build = builder.build();

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:试用云数据库TiDB的心得
下一篇:资源池化 多租户与数据库整合方案
相关文章