-
Notifications
You must be signed in to change notification settings - Fork 1
/
pl_funcs.c
110 lines (83 loc) · 2.66 KB
/
pl_funcs.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "postgres.h"
#include "funcapi.h"
#include "port/atomics.h"
#include "utils/builtins.h"
#include "access/htup_details.h"
#include "pg_querylog.h"
PG_FUNCTION_INFO_V1( get_queries );
typedef struct queries_view_fctx {
int index;
} queries_view_fctx;
Datum
get_queries(PG_FUNCTION_ARGS)
{
MemoryContext old_mcxt;
FuncCallContext *funccxt;
queries_view_fctx *usercxt;
bool only_running = PG_GETARG_BOOL(0);
bool skip_overflow = PG_GETARG_BOOL(1);
if (SRF_IS_FIRSTCALL())
{
TupleDesc tupdesc;
funccxt = SRF_FIRSTCALL_INIT();
old_mcxt = MemoryContextSwitchTo(funccxt->multi_call_memory_ctx);
usercxt = (queries_view_fctx *) palloc(sizeof(queries_view_fctx));
usercxt->index = -1;
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
funccxt->tuple_desc = BlessTupleDesc(tupdesc);
funccxt->user_fctx = (void *) usercxt;
MemoryContextSwitchTo(old_mcxt);
}
funccxt = SRF_PERCALL_SETUP();
usercxt = (queries_view_fctx *) funccxt->user_fctx;
while (pgl_shared_hdr)
{
uint64 gen;
CollectedQuery *item;
HeapTuple htup;
Datum values[natts_queries_view];
bool isnull[natts_queries_view];
const char *itembuf;
usercxt->index++;
if (usercxt->index == pgl_shared_hdr->count)
break;
item = (CollectedQuery *) (pgl_shared_queries + usercxt->index);
itembuf = QUERYBUF(pgl_shared_hdr, usercxt->index);
if (item->pid == 0)
continue;
if (only_running && !item->running)
continue;
if (skip_overflow && item->overflow)
continue;
pg_read_barrier();
gen = item->gen;
if (!pg_atomic_unlocked_test_flag(&item->is_free))
continue;
if (item->magic != PG_QUERYLOG_ITEM_MAGIC)
elog(ERROR, "magic programming error");
MemSet(values, 0, sizeof(values));
MemSet(isnull, 0, sizeof(isnull));
values[att_queries_start] = TimestampTzGetDatum(item->start);
if (item->end != 0)
values[att_queries_end] = TimestampTzGetDatum(item->end);
else
isnull[att_queries_end] = true;
values[att_queries_pid] = Int32GetDatum(item->pid);
values[att_queries_running] = BoolGetDatum(item->running);
values[att_queries_overflow] = BoolGetDatum(item->overflow);
values[att_queries_query] = PointerGetDatum(
cstring_to_text_with_len(itembuf, item->querylen));
if (item->params_offset)
values[att_queries_params] = CStringGetTextDatum(itembuf + item->params_offset);
else
isnull[att_queries_params] = true;
pg_read_barrier();
if (gen != item->gen)
continue;
/* form output tuple */
htup = heap_form_tuple(funccxt->tuple_desc, values, isnull);
SRF_RETURN_NEXT(funccxt, HeapTupleGetDatum(htup));
}
SRF_RETURN_DONE(funccxt);
}