2022-09-02 mysql/stonedb-读取Pack数据流程记录

目录

​​摘要:​​

​​逻辑建模:​​

​​动态结构-时序图:​​

​​加载字符类型的列:​​

​​调用堆栈:​​

​​核心函数:​​

​​Descriptor::LockSourcePacks​​

​​VirtualColumn::LockSourcePacks​​

​​VCPackGuardian::LockPackrow​​

​​RCAttr::LockPackForUse​​

​​DataCache::GetOrFetchObject​​

​​RCAttr::Fetch​​

​​PackStr::PackStr​​

​​PackStr::LoadDataFromFile​​

​​PackStr::LoadCompressed​​


摘要:

本文主要记录在查询过程中,数据的获取过程。

涉及从物理层到逻辑层的数据处理。

逻辑建模:

动态结构-时序图:

读取字符串类型的列

静态结构-类图

2022-09-02 mysql/stonedb-读取Pack数据流程记录

加载字符类型的列:

调用堆栈:

(gdb) bt
#0 Tianmu::core::PackStr::LoadDataFromFile (this=0x7ef154069870, f=0x7f14c1780df0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/pack_str.cpp:99
#1 0x00000000030a8199 in Tianmu::core::PackStr::PackStr (this=0x7ef154069870, dpn=0x7ef14d9ff140, pc=..., s=0x7ef154030da0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/pack_str.cpp:57
#2 0x00000000030e0bad in __gnu_cxx::new_allocator<Tianmu::core::PackStr>::construct<Tianmu::core::PackStr<Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> > (this=0x7f14c1780fc7, __p=0x7ef154069870, __args#0=@0x7f14c17811f8: 0x7ef14d9ff140, __args#1=...,
__args#2=@0x7ef15403eb18: 0x7ef154030da0) at /opt/rh/devtoolset-7/root/usr/include/c++/7/ext/new_allocator.h:136
#3 0x00000000030dfd38 in std::allocator_traits<std::allocator<Tianmu::core::PackStr> >::construct<Tianmu::core::PackStr<Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> > (__a=..., __p=0x7ef154069870, __args#0=@0x7f14c17811f8: 0x7ef14d9ff140, __args#1=...,
__args#2=@0x7ef15403eb18: 0x7ef154030da0) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/alloc_traits.h:475
#4 0x00000000030de2e2 in std::_Sp_counted_ptr_inplace<Tianmu::core::PackStr, std::allocator<Tianmu::core::PackStr>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (this=0x7ef154069860, __a=...)
at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr_base.h:526
#5 0x00000000030dc0df in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<Tianmu::core::PackStr, std::allocator<Tianmu::core::PackStr>, Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (this=0x7f14c1781208, __a=...)
at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr_base.h:637
#6 0x00000000030d9dfa in std::__shared_ptr<Tianmu::core::PackStr, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<Tianmu::core::PackStr>, Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (this=0x7f14c1781200, __tag=..., __a=...)
at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr_base.h:1295
#7 0x00000000030d8473 in std::shared_ptr<Tianmu::core::PackStr>::shared_ptr<std::allocator<Tianmu::core::PackStr>, Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (this=0x7f14c1781200, __tag=..., __a=...) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr.h:344
#8 0x00000000030d6eff in std::allocate_shared<Tianmu::core::PackStr, std::allocator<Tianmu::core::PackStr>, Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (__a=..., __args#0=@0x7f14c17811f8: 0x7ef14d9ff140, __args#1=..., __args#2=@0x7ef15403eb18: 0x7ef154030da0)
at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr.h:691
#9 0x00000000030d4ef5 in std::make_shared<Tianmu::core::PackStr, Tianmu::core::DPN*&, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty> const&, Tianmu::core::ColumnShare*&> (__args#0=@0x7f14c17811f8: 0x7ef14d9ff140, __args#1=..., __args#2=@0x7ef15403eb18: 0x7ef154030da0)
at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/shared_ptr.h:707
#10 0x00000000030cd7fd in Tianmu::core::RCAttr::Fetch (this=0x7ef15403e980, pc=...) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/rc_attr.cpp:793
#11 0x00000000030d4a97 in Tianmu::core::DataCache::GetOrFetchObject<Tianmu::core::Pack, Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty>, Tianmu::core::RCAttr> (this=0x67412c0, coord_=..., fetcher_=0x7ef15403e980) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/data_cache.h:232
#12 0x00000000030ccfdc in Tianmu::core::RCAttr::LockPackForUse (this=0x7ef15403e980, pn=5)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/rc_attr.cpp:737
#13 0x0000000002db5694 in Tianmu::core::RCTable::LockPackForUse (this=0x7ef15403b770, attr=3, pack_no=5)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/rc_table.cpp:207
#14 0x00000000030a5349 in Tianmu::core::VCPackGuardian::LockPackrow (this=0x7ef154055d08, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/pack_guardian.cpp:75
#15 0x0000000002d8a41a in Tianmu::vcolumn::VirtualColumn::LockSourcePacks (this=0x7ef154055c30, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/vc/virtual_column.h:45
#16 0x000000000305d76a in Tianmu::core::Descriptor::LockSourcePacks (this=0x7ef154062850, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/descriptor.cpp:786
#17 0x000000000305dd4a in Tianmu::core::Descriptor::EvaluatePack (this=0x7ef154062850, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/descriptor.cpp:866
#18 0x00000000030bfd87 in Tianmu::core::ParameterizedFilter::ApplyDescriptor (this=0x7ef1540549a0, desc_number=0, limit=-1)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/parameterized_filter.cpp:1373
#19 0x00000000030be329 in Tianmu::core::ParameterizedFilter::UpdateMultiIndex (this=0x7ef1540549a0, count_only=false, limit=-1)
---Type <return> to continue, or q <return> to quit---
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/parameterized_filter.cpp:1092
#20 0x0000000002d80377 in Tianmu::core::Query::Preexecute (this=0x7f14c1782800, qu=..., sender=0x7ef154054760, display_now=true)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/query.cpp:777
#21 0x0000000002d51b76 in Tianmu::core::Engine::Execute (this=0x6741050, thd=0x7ef154002bd0, lex=0x7ef154004ef8, result_output=0x7ef15401fc70, unit_for_union=0x0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/engine_execute.cpp:421
#22 0x0000000002d50e04 in Tianmu::core::Engine::HandleSelect (this=0x6741050, thd=0x7ef154002bd0, lex=0x7ef154004ef8, result=@0x7f14c1782d18: 0x7ef15401fc70, setup_tables_done_option=0,
res=@0x7f14c1782d14: 0, optimize_after_tianmu=@0x7f14c1782d0c: 1, tianmu_free_join=@0x7f14c1782d10: 1, with_insert=0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/engine_execute.cpp:232
#23 0x0000000002e39861 in Tianmu::dbhandler::TIANMU_HandleSelect (thd=0x7ef154002bd0, lex=0x7ef154004ef8, result=@0x7f14c1782d18: 0x7ef15401fc70, setup_tables_done_option=0,
res=@0x7f14c1782d14: 0, optimize_after_tianmu=@0x7f14c1782d0c: 1, tianmu_free_join=@0x7f14c1782d10: 1, with_insert=0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/handler/ha_rcengine.cpp:82
#24 0x00000000024793ba in execute_sqlcom_select (thd=0x7ef154002bd0, all_tables=0x7ef15401ca50) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/sql_parse.cc:5182
#25 0x000000000247273e in mysql_execute_command (thd=0x7ef154002bd0, first_level=true) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/sql_parse.cc:2831
#26 0x000000000247a383 in mysql_parse (thd=0x7ef154002bd0, parser_state=0x7f14c1783eb0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/sql_parse.cc:5621
#27 0x000000000246f61b in dispatch_command (thd=0x7ef154002bd0, com_data=0x7f14c1784650, command=COM_QUERY)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/sql_parse.cc:1495
#28 0x000000000246e547 in do_command (thd=0x7ef154002bd0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/sql_parse.cc:1034
#29 0x00000000025a1147 in handle_connection (arg=0x10f92fc0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/sql/conn_handler/connection_handler_per_thread.cc:313
#30 0x0000000002c8757c in pfs_spawn_thread (arg=0xbf73e40) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/perfschema/pfs.cc:2197
#31 0x00007f14cbca7ea5 in start_thread () from /lib64/libpthread.so.0
#32 0x00007f14c987cb0d in clone () from /lib64/libc.so.6

核心函数:

Descriptor::LockSourcePacks

void Descriptor::LockSourcePacks(const MIIterator &mit) {
if (tree) tree->root->PrepareToLock(0);
if (attr.vc) attr.vc->LockSourcePacks(mit);
if (val1.vc) val1.vc->LockSourcePacks(mit);
if (val2.vc) val2.vc->LockSourcePacks(mit);
}

VirtualColumn::LockSourcePacks

void LockSourcePacks(const core::MIIterator &mit) override { pguard.LockPackrow(mit); }

VCPackGuardian::LockPackrow

void VCPackGuardian::LockPackrow(const MIIterator &mit) {
int threadId = mit.GetTaskId();
int taskNum = mit.GetTaskNum();
{
std::scoped_lock g(mx_thread);
if (!initialized) {
Initialize(taskNum);
}
if (initialized && (taskNum > threads)) {
// recheck to make sure last_pack is not overflow
ResizeLastPack(taskNum);
}
}
for (auto iter = my_vc.GetVarMap().cbegin(); iter != my_vc.GetVarMap().cend(); iter++) {
int cur_dim = iter->dim;
if (last_pack[cur_dim][threadId] != mit.GetCurPackrow(cur_dim)) {
JustATable *tab = iter->GetTabPtr().get();
if (last_pack[cur_dim][threadId] != common::NULL_VALUE_32)
tab->UnlockPackFromUse(iter->col_ndx, last_pack[cur_dim][threadId]);
try {
tab->LockPackForUse(iter->col_ndx, mit.GetCurPackrow(cur_dim));
} catch (...) {
// unlock packs which are partially locked for this packrow
auto it = my_vc.GetVarMap().begin();
for (; it != iter; ++it) {
int cur_dim = it->dim;
if (last_pack[cur_dim][threadId] != mit.GetCurPackrow(cur_dim) &&
last_pack[cur_dim][threadId] != common::NULL_VALUE_32)
it->GetTabPtr()->UnlockPackFromUse(it->col_ndx, mit.GetCurPackrow(cur_dim));
}

for (++iter; iter != my_vc.GetVarMap().end(); ++iter) {
int cur_dim = iter->dim;
if (last_pack[cur_dim][threadId] != mit.GetCurPackrow(cur_dim) &&
last_pack[cur_dim][threadId] != common::NULL_VALUE_32)
iter->GetTabPtr()->UnlockPackFromUse(iter->col_ndx, last_pack[cur_dim][threadId]);
}

for (auto const &iter : my_vc.GetVarMap()) last_pack[iter.dim][threadId] = common::NULL_VALUE_32;
throw;
}
}
}
for (auto const &iter : my_vc.GetVarMap())
last_pack[iter.dim][threadId] = mit.GetCurPackrow(iter.dim); // must be in a separate loop, otherwise
// for "a + b" will not lock b
}

RCAttr::LockPackForUse

void RCAttr::LockPackForUse(common::PACK_INDEX pn) {
auto dpn = &get_dpn(pn);
if (dpn->IsLocal()) dpn = m_share->get_dpn_ptr(dpn->base);

if (dpn->Trivial() && !dpn->IsLocal()) return;

while (true) {
if (dpn->IncRef()) return;

// either the pack is not loaded yet or other thread is loading it

uint64_t v = 0;
if (dpn->CAS(v, loading_flag)) {
// we win the chance to load data
std::shared_ptr<Pack> sp;
try {
sp = ha_rcengine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
} catch (std::exception &e) {
dpn->SetPackPtr(0);
TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
throw e;
} catch (...) {
dpn->SetPackPtr(0);
TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
throw;
}

uint64_t newv = reinterpret_cast<unsigned long>(sp.get()) + tag_one;
uint64_t expected = loading_flag;
ASSERT(dpn->CAS(expected, newv),
"bad loading flag" + std::to_string(newv) + ". " + Path().string() + " index:" + std::to_string(pn));
return;
}
// some one is loading data, wait a while and retry
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

DataCache::GetOrFetchObject

template <typename T, typename U, typename V>
std::shared_ptr<T> GetOrFetchObject(U const &coord_, V *fetcher_) {
auto &c(cache<U>());
auto &w(waitIO<U>());
auto &cond(condition<U>());
bool waited = false;
/* a scope for mutex lock */
{
/* Lock is acquired inside */
std::unique_lock<std::recursive_mutex> m_obj_guard(mm::TraceableObject::GetLockingMutex());

std::scoped_lock lock(m_cache_mutex);

auto it = c.find(coord_);

if (it != c.end()) {
if constexpr (U::ID == COORD_TYPE::PACK) {
it->second->Lock();
++m_cacheHits;
it->second->TrackAccess();
}
return std::static_pointer_cast<T>(it->second);
}

{
if constexpr (U::ID == COORD_TYPE::PACK) m_cacheMisses++;
auto rit = w.find(coord_);
while (rit != w.end()) {
m_readWaitInProgress++;
if (waited)
m_falseWakeup++;
else
m_readWait++;

m_cache_mutex.unlock();
cond.wait(m_obj_guard);
m_cache_mutex.lock();

waited = true;
m_readWaitInProgress--;
rit = w.find(coord_);
}
it = c.find(coord_);
if (it != c.end()) {
if constexpr (U::ID == COORD_TYPE::PACK) {
it->second->Lock();
it->second->TrackAccess();
}
return std::static_pointer_cast<T>(it->second);
}
// mm::TraceableObject::GetLockingMutex().Unlock();
// if we get here the obj has been loaded, used, unlocked
// and pushed out of memory before we should get to it after
// waiting
}
w.insert(coord_);
m_packLoadInProgress++;
}

std::shared_ptr<T> obj;
try {
obj = fetcher_->Fetch(coord_);
} catch (...) {
std::scoped_lock m_obj_guard(mm::TraceableObject::GetLockingMutex());
std::scoped_lock lock(m_cache_mutex);
m_loadErrors++;
m_packLoadInProgress--;
w.erase(coord_);
cond.notify_all();
throw;
}

{
obj->SetOwner(this);
std::scoped_lock m_obj_guard(mm::TraceableObject::GetLockingMutex());
std::scoped_lock lock(m_cache_mutex);
if constexpr (U::ID == COORD_TYPE::PACK) {
m_packLoads++;
obj->TrackAccess();
}
m_packLoadInProgress--;
DEBUG_ASSERT(c.find(coord_) == c.end());
c.insert(std::make_pair(coord_, obj));
w.erase(coord_);
}
cond.notify_all();

return obj;
}

RCAttr::Fetch

std::shared_ptr<Pack> RCAttr::Fetch(const PackCoordinate &pc) {
auto dpn = m_share->get_dpn_ptr(pc_dp(pc));
if (GetPackType() == common::PackType::STR) return std::make_shared<PackStr>(dpn, pc, m_share);
return std::make_shared<PackInt>(dpn, pc, m_share);
}

PackStr::PackStr

PackStr::PackStr(DPN *dpn, PackCoordinate pc, ColumnShare *s) : Pack(dpn, pc, s) {
auto t = s->ColType().GetTypeName();

if (t == common::CT::BIN || t == common::CT::LONGTEXT)
data.len_mode = sizeof(uint32_t);
else
data.len_mode = sizeof(uint16_t);

try {
data.index = (char **)alloc(sizeof(char *) * (1 << s->pss), mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
data.lens = alloc((data.len_mode * (1 << s->pss)), mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(data.lens, 0, data.len_mode * (1 << s->pss));

if (!dpn->NullOnly()) {
system::TianmuFile f;
f.OpenReadOnly(s->DataFile());
f.Seek(dpn->addr, SEEK_SET);
LoadDataFromFile(&f);
}
} catch (...) {
Destroy();
throw;
}
}
system::TianmuFile f;
f.OpenReadOnly(s->DataFile());
f.Seek(dpn->addr, SEEK_SET);
LoadDataFromFile(&f);

PackStr::LoadDataFromFile

void PackStr::LoadDataFromFile(system::Stream *f) {
FunctionExecutor fe([this]() { Lock(); }, [this]() { Unlock(); });

if (IsModeNoCompression()) {
LoadUncompressed(f);
} else if (s->ColType().GetFmt() == common::PackFmt::TRIE) {
LoadCompressedTrie(f);
} else {
LoadCompressed(f);
}
}

PackStr::LoadCompressed

void PackStr::LoadCompressed(system::Stream *f) {
ASSERT(IsModeCompressionApplied());

auto compressed_buf = alloc_ptr(dpn->len + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED);
f->ReadExact(compressed_buf.get(), dpn->len);

dpn->synced = true;

// if (ATI::IsBinType(s->ColType().GetTypeName())) {
// throw common::Exception("Compression format no longer supported.");
//}

// uncompress the data
mm::MMGuard<char *> tmp_index((char **)alloc(dpn->nr * sizeof(char *), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this);

char *cur_buf = reinterpret_cast<char *>(compressed_buf.get());

char (*FREE_PLACE)(reinterpret_cast<char *>(-1));

uint null_buf_size = 0;
if (dpn->nn > 0) {
null_buf_size = (*(ushort *)cur_buf);
if (!IsModeNullsCompressed()) // flat null encoding
std::memcpy(nulls.get(), cur_buf + 2, null_buf_size);
else {
compress::BitstreamCompressor bsc;
CprsErr res = bsc.Decompress((char *)nulls.get(), null_buf_size, cur_buf + 2, dpn->nr, dpn->nn);
if (res != CprsErr::CPRS_SUCCESS) {
throw common::DatabaseException("Decompression of nulls failed for column " +
std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " +
std::to_string(pc_dp(GetCoordinate().co.pack) + 1) + " (error " +
std::to_string(static_cast<int>(res)) + ").");
}
}
cur_buf += (null_buf_size + 2);

for (uint i = 0; i < dpn->nr; i++) {
if (IsNull(i))
tmp_index[i] = nullptr;
else
tmp_index[i] = FREE_PLACE; // special value: an object awaiting decoding
}
} else
for (uint i = 0; i < dpn->nr; i++) tmp_index[i] = FREE_PLACE;

auto comp_len_buf_size = *(uint32_t *)cur_buf;
auto maxv = *(uint32_t *)(cur_buf + 4);

if (maxv != 0) {
compress::NumCompressor<uint> nc;
mm::MMGuard<uint> cn_ptr((uint *)alloc((1 << s->pss) * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this);
CprsErr res = nc.Decompress(cn_ptr.get(), (char *)(cur_buf + 8), comp_len_buf_size - 8, dpn->nr - dpn->nn, maxv);
if (res != CprsErr::CPRS_SUCCESS) {
std::stringstream msg_buf;
msg_buf << "Decompression of lengths of std::string values failed for column "
<< (pc_column(GetCoordinate().co.pack) + 1) << ", pack " << (pc_dp(GetCoordinate().co.pack) + 1)
<< " (error " << static_cast<int>(res) << ").";
throw common::DatabaseException(msg_buf.str());
}

int oid = 0;
for (uint o = 0; o < dpn->nr; o++)
if (!IsNull(int(o))) SetSize(o, (uint)cn_ptr[oid++]);
} else {
for (uint o = 0; o < dpn->nr; o++)
if (!IsNull(int(o))) SetSize(o, 0);
}
cur_buf += comp_len_buf_size;

auto dlen = *(uint32_t *)cur_buf;
cur_buf += sizeof(dlen);
data.sum_len = *(uint32_t *)cur_buf;
cur_buf += sizeof(uint32_t);

ASSERT(cur_buf + dlen == dpn->len + reinterpret_cast<char *>(compressed_buf.get()),
std::to_string(data.sum_len) + "/" + std::to_string(dpn->len) + "/" + std::to_string(dlen));

int zlo = 0;
for (uint obj = 0; obj < dpn->nr; obj++)
if (!IsNull(obj) && GetSize(obj) == 0) zlo++;
int objs = dpn->nr - dpn->nn - zlo;

if (objs) {
mm::MMGuard<uint> tmp_len((uint *)alloc(objs * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this);
for (uint tmp_id = 0, id = 0; id < dpn->nr; id++)
if (!IsNull(id) && GetSize(id) != 0) tmp_len[tmp_id++] = GetSize(id);

if (dlen) {
data.v.push_back({(char *)alloc(data.sum_len, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED), data.sum_len, 0});
compress::TextCompressor tc;
CprsErr res =
tc.Decompress(data.v.front().ptr, data.sum_len, cur_buf, dlen, tmp_index.get(), tmp_len.get(), objs);
if (res != CprsErr::CPRS_SUCCESS) {
std::stringstream msg_buf;
msg_buf << "Decompression of std::string values failed for column " << (pc_column(GetCoordinate().co.pack) + 1)
<< ", pack " << (pc_dp(GetCoordinate().co.pack) + 1) << " (error " << static_cast<int>(res) << ").";
throw common::DatabaseException(msg_buf.str());
}
}
}

for (uint tmp_id = 0, id = 0; id < dpn->nr; id++) {
if (!IsNull(id) && GetSize(id) != 0)
SetPtr(id, (char *)tmp_index[tmp_id++]);
else {
SetSize(id, 0);
SetPtr(id, 0);
}
}
}