This repository was archived by the owner on Sep 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 618
/
Copy pathcatalog_sync_brain_job.cpp
102 lines (95 loc) · 4.04 KB
/
catalog_sync_brain_job.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//===----------------------------------------------------------------------===//
//
// Peloton
//
// catalog_sync_brain_job.cpp
//
// Identification: src/brain/catalog_sync_brain_job.cpp
//
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include "brain/catalog_sync_brain_job.h"
namespace peloton {
namespace brain {
void CatalogSyncBrainJob::OnJobInvocation(BrainEnvironment *env) {
auto &manager = concurrency::TransactionManagerFactory::GetInstance();
for (auto *catalog : catalog::Catalog::GetInstance()->AvailableCatalogs())
SyncCatalog(catalog, env, manager);
}
time_t CatalogSyncBrainJob::TimeFromString(const char *str) {
struct tm tm;
PELOTON_MEMSET(&tm, 0, sizeof(struct tm));
strptime(str, "%Y-%m-%d %H:%M:%S", &tm);
return mktime(&tm);
}
std::string CatalogSyncBrainJob::FetchCatalogQuery(catalog::AbstractCatalog *catalog) {
// We need to special cast these two tables because we cannot put a reasonable
// primary key on them without sequences
if (catalog->GetName() == QUERY_HISTORY_CATALOG_NAME)
return "SELECT * FROM pg_catalog." + std::string(QUERY_HISTORY_CATALOG_NAME)
+ " WHERE timestamp > " + std::to_string(last_history_timestamp_);
else
return "SELECT * FROM pg_catalog." + catalog->GetName();
}
void CatalogSyncBrainJob::UpdateTimestamp(catalog::AbstractCatalog *catalog,
pqxx::field field) {
if (catalog->GetName() == QUERY_HISTORY_CATALOG_NAME
&& field.name() == std::string("timestamp"))
last_history_timestamp_ =
std::max(last_history_timestamp_, field.as<int64_t>());
}
void CatalogSyncBrainJob::SyncCatalog(catalog::AbstractCatalog *catalog,
BrainEnvironment *env,
concurrency::TransactionManager &manager) {
pqxx::result r = env->ExecuteQuery(FetchCatalogQuery(catalog));
for (auto row : r) {
concurrency::TransactionContext *txn =
manager.BeginTransaction(IsolationLevelType::REPEATABLE_READS);
catalog::Schema *catalog_schema = catalog->GetDataTable()->GetSchema();
std::unique_ptr<storage::Tuple> tuple(
new storage::Tuple(catalog_schema, true));
for (auto field : row) {
oid_t column_id = catalog_schema->GetColumnID(field.name());
tuple->SetValue(column_id, PqxxFieldToPelotonValue(field));
UpdateTimestamp(catalog, field);
}
catalog->InsertTuple(std::move(tuple), txn);
// We know this will always succeed on the brain side
manager.CommitTransaction(txn);
}
}
type::Value CatalogSyncBrainJob::PqxxFieldToPelotonValue(pqxx::field &f) {
type::TypeId type = PostgresValueTypeToPelotonValueType(
static_cast<PostgresValueType>(f.type()));
if (f.is_null()) {
return type == peloton::type::TypeId::VARCHAR
? type::ValueFactory::GetVarcharValue("")
: type::ValueFactory::GetNullValueByType(type);
}
switch (type) {
case type::TypeId::BOOLEAN:
return type::ValueFactory::GetBooleanValue(f.as<bool>());
case type::TypeId::TINYINT:
return type::ValueFactory::GetTinyIntValue(static_cast<int8_t>(f.as<
int32_t>()));
case type::TypeId::SMALLINT:
return type::ValueFactory::GetSmallIntValue(static_cast<int16_t>(f.as<
int32_t>()));
case type::TypeId::INTEGER:
return type::ValueFactory::GetIntegerValue(f.as<int32_t>());
case type::TypeId::BIGINT:
return type::ValueFactory::GetBigIntValue(f.as<int64_t>());
case type::TypeId::TIMESTAMP:
return type::ValueFactory::GetTimestampValue(TimeFromString(f.c_str()));
case type::TypeId::DECIMAL:
return type::ValueFactory::GetDecimalValue(f.as<double>());
case type::TypeId::VARCHAR:return type::ValueFactory::GetVarcharValue(f.c_str());
default:
throw ConversionException(StringUtil::Format(
"No corresponding c++ type for postgres type %d",
static_cast<int>(type)));
}
}
} // namespace brain
} // nanespace peloton