Skip to content

Commit

Permalink
Merge pull request #37 from RatioPBC/bulk_fhir
Browse files Browse the repository at this point in the history
bulk fhir import support
  • Loading branch information
lei0zhou authored Sep 10, 2024
2 parents 429fcca + 49426d7 commit b28360b
Show file tree
Hide file tree
Showing 65 changed files with 3,101 additions and 8 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ config :dart_sass,
cd: Path.expand("../assets", __DIR__)
]

config :kindling, root_resources: ["Patient", "Observation", "Organization"]

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
229 changes: 229 additions & 0 deletions lib/epiviewpoint/bulk_fhir_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
defmodule EpiViewpoint.BulkFhirParser do
def parse_bulk_fhir(file_list) do
with {:ok, contents} <- combine_contents(file_list),
{:ok, resources} <- load_resources(file_list),
{:ok, extracted} <- extract_resources(resources),
{:ok, joined} <- join_resources(extracted) do
{:ok, to_map(joined, contents)}
end
end

defp combine_contents(file_list) do
contents = file_list |> Enum.map(& &1.contents) |> List.to_string()
{:ok, contents}
end

defp load_resources(file_list) do
result =
Enum.reduce_while(file_list, {:ok, %{}}, fn file, {:ok, acc} ->
case load_resource_file(file.contents) do
{:ok, resources} -> {:cont, {:ok, group_resources(resources, acc)}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)

case result do
{:ok, resources} -> {:ok, resources}
{:error, reason} -> {:error, reason}
end
end

defp group_resources(resources, acc) do
Enum.reduce(resources, acc, fn resource, acc ->
Map.update(acc, resource.resource_type, [resource], &[resource | &1])
end)
end

defp load_resource_file(file_content) do
result =
file_content
|> String.split("\n")
|> Stream.map(&String.trim/1)
|> Stream.filter(&(&1 != ""))
|> Stream.map(&json_to_kindle_schema/1)
|> Enum.to_list()

{:ok, result}
rescue
e -> {:error, "Failed to load resource file: #{inspect(e)}"}
end

defp json_to_kindle_schema(json) do
case Jason.decode(json) do
{:ok, map} -> Kindling.Converter.convert("EpiViewpoint.R4", map)
{:error, reason} -> {:error, "Failed to decode JSON: #{inspect(reason)}"}
end
end

defp extract_resources(resources) do
result =
Enum.reduce_while(resources, {:ok, %{}}, fn {resource_type, resource_list}, {:ok, acc} ->
case extract_resource(resource_type, resource_list) do
{:ok, extracted} -> {:cont, {:ok, Map.put(acc, resource_type, extracted)}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)

case result do
{:ok, extracted} -> {:ok, extracted}
{:error, reason} -> {:error, reason}
end
end

defp extract_resource("Patient", resource_list) do
{:ok, Enum.map(resource_list, &extract_patient/1)}
end

defp extract_resource("Observation", resource_list) do
{:ok, Enum.map(resource_list, &extract_observation/1)}
end

defp extract_resource("Organization", resource_list) do
{:ok, Enum.map(resource_list, &extract_organization/1)}
end

defp extract_resource(unknown_type, _) do
{:error, "Unknown resource type: #{unknown_type}"}
end

defp extract_patient(%EpiViewpoint.R4.Patient{} = patient) do
%EpiViewpoint.R4.Patient{
id: caseid,
identifier: [%EpiViewpoint.R4.Identifier{value: person_tid}],
name: [%EpiViewpoint.R4.HumanName{given: [search_firstname], family: search_lastname}],
birth_date: dateofbirth,
gender: sex,
address: [
%EpiViewpoint.R4.Address{
line: [diagaddress_street1],
city: diagaddress_city,
state: diagaddress_state,
postal_code: diagaddress_zip
}
],
telecom: [%EpiViewpoint.R4.ContactPoint{value: phonenumber}],
extension: extensions
} = patient

%{
caseid: caseid,
person_tid: person_tid,
search_firstname: search_firstname,
search_lastname: search_lastname,
dateofbirth: format_date(dateofbirth),
sex: String.capitalize(to_string(sex)),
diagaddress_street1: diagaddress_street1,
diagaddress_city: diagaddress_city,
diagaddress_state: diagaddress_state,
diagaddress_zip: diagaddress_zip,
phonenumber: phonenumber,
ethnicity: find_extension(extensions, :ethnicity),
occupation: find_extension(extensions, :occupation),
race: find_extension(extensions, :race)
}
end

defp extract_observation(%EpiViewpoint.R4.Observation{} = observation) do
%EpiViewpoint.R4.Observation{
id: lab_result_tid,
subject: %EpiViewpoint.R4.Reference{reference: "Patient/" <> pat_id},
effective_date_time: datecollected,
issued: resultdate,
code: %EpiViewpoint.R4.CodeableConcept{text: testname},
interpretation: [%EpiViewpoint.R4.CodeableConcept{coding: [%EpiViewpoint.R4.Coding{display: result}]}],
performer: [%EpiViewpoint.R4.Reference{reference: "Organization/" <> org_id}],
extension: extensions
} = observation

%{
lab_result_tid: lab_result_tid,
pat_id: pat_id,
datecollected: datecollected,
resultdate: format_date(resultdate),
testname: testname,
result: result,
org_id: org_id,
datereportedtolhd: find_extension(extensions, :datereportedtolhd)
}
end

defp extract_organization(%EpiViewpoint.R4.Organization{} = organization) do
%{
organization_id: organization.id,
ordering_facility_name: organization.name
}
end

defp find_extension(extensions, :race) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{
url: "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race",
extension: [%EpiViewpoint.R4.Extension{url: "ombCategory", value_coding: %EpiViewpoint.R4.Coding{display: value}}, _]
} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :ethnicity) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{
url: "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity",
extension: [%EpiViewpoint.R4.Extension{url: "ombCategory", value_coding: %EpiViewpoint.R4.Coding{display: value}}, _]
} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :occupation) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{url: "http://hl7.org/fhir/StructureDefinition/patient-occupation", value_string: value} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :datereportedtolhd) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{url: "http://hl7.org/fhir/StructureDefinition/datereportedtolhd", value_date: value} ->
value

_ ->
nil
end)
end

defp join_resources(resources) do
patients = Map.get(resources, "Patient", [])
observations = Map.get(resources, "Observation", [])
organizations = Map.get(resources, "Organization", [])

joined = Enum.map(observations, &merge_resources(&1, patients, organizations))

{:ok, joined}
end

defp merge_resources(observation, patients, organizations) do
patient = Enum.find(patients, %{}, &(&1.caseid == observation.pat_id))
organization = Enum.find(organizations, %{}, &(&1.organization_id == observation.org_id))

observation
|> Map.merge(patient)
|> Map.merge(organization)
|> Map.drop([:pat_id, :org_id, :organization_id])
end

defp to_map(resources, contents) do
%{file_name: "load.bulk_fhir", contents: contents, list: resources}
end

defp format_date(date) when is_struct(date, Date), do: Calendar.strftime(date, "%m/%d/%Y")
defp format_date(datetime) when is_struct(datetime, DateTime), do: DateTime.to_date(datetime) |> format_date()
end
4 changes: 4 additions & 0 deletions lib/epiviewpoint/cases.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ defmodule EpiViewpoint.Cases do
def count_lab_results(), do: LabResult |> Repo.aggregate(:count)
def create_lab_result!({attrs, audit_meta}), do: %LabResult{} |> change_lab_result(attrs) |> AuditingRepo.insert!(audit_meta)
def import_lab_results(lab_result_data_file_string, originator), do: Import.import_data_file(lab_result_data_file_string, originator)

def import_bulk_fhir_lab_results(lab_result_data_file_list, originator),
do: Import.import_bulk_fhir_data_file(lab_result_data_file_list, originator)

def list_lab_results(), do: LabResult.Query.all() |> Repo.all()
def preload_initiating_lab_result(case_investigations_or_nil), do: case_investigations_or_nil |> Repo.preload(:initiating_lab_result)
def preload_lab_results(person_or_people_or_nil), do: person_or_people_or_nil |> Repo.preload(lab_results: LabResult.Query.display_order())
Expand Down
7 changes: 7 additions & 0 deletions lib/epiviewpoint/cases/import.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule EpiViewpoint.Cases.Import do
alias EpiViewpoint.Accounts
alias EpiViewpoint.AuditLog
alias EpiViewpoint.BulkFhirParser
alias EpiViewpoint.Cases
alias EpiViewpoint.Cases.Import
alias EpiViewpoint.Cases.LabResult
Expand Down Expand Up @@ -59,6 +60,11 @@ defmodule EpiViewpoint.Cases.Import do
]
end

def import_bulk_fhir_data_file(lab_result_data_file_list, originator) do
{:ok, bulk_fhir_data} = BulkFhirParser.parse_bulk_fhir(lab_result_data_file_list)
import_data_file(bulk_fhir_data, originator)
end

def import_data_file(_file, %{admin: false}), do: {:error, "Originator must be an admin"}

def import_data_file(file, %Accounts.User{} = originator) do
Expand Down Expand Up @@ -103,6 +109,7 @@ defmodule EpiViewpoint.Cases.Import do
case Path.extname(file.file_name) do
".csv" -> DataFile.read(file.contents, :csv, &rename_headers/1, @fields)
".ndjson" -> DataFile.read(file.contents, :ndjson, &rename_headers/1, @fields)
".bulk_fhir" -> DataFile.read(file.list, :bulk_fhir, &rename_headers/1, @fields)
_ -> {:error, "Unsupported file type: #{file.extension}"}
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/epiviewpoint/data_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ defmodule EpiViewpoint.DataFile do
read(string, header_transformer, headers, &parse_ndjson/1)
end

def read(string, :bulk_fhir, header_transformer, headers) when is_list(string) do
read(string, header_transformer, headers, &parse_bulk_fhir/1)
end

def read(input, header_transformer, [required: required_headers, optional: optional_headers], parser) do
with {:ok, df} <- parser.(input),
{:ok, provided_headers} <- extract_provided_headers(df, header_transformer),
Expand Down Expand Up @@ -84,6 +88,10 @@ defmodule EpiViewpoint.DataFile do
DataFrame.load_ndjson(input)
end

defp parse_bulk_fhir(input) do
{:ok, DataFrame.new(input)}
end

defp validate_csv(input) do
EpiViewpoint.DataFile.Parser.parse_string(input, headers: true)
input
Expand Down
68 changes: 68 additions & 0 deletions lib/epiviewpoint/r4/address.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule EpiViewpoint.R4.Address do
use Ecto.Schema
import Ecto.Changeset

@fields [
:city,
:country,
:district,
:id,
:line,
:postal_code,
:state,
:text,
:type,
:use
]
@required_fields []

embedded_schema do
# Fields
field(:city, :string)
field(:country, :string)
field(:district, :string)
field(:postal_code, :string)
field(:state, :string)
field(:text, :string)

field(:line, {:array, :string})

# Enum
field(:type, Ecto.Enum,
values: [
:postal,
:physical,
:both
]
)

field(:use, Ecto.Enum,
values: [
:home,
:work,
:temp,
:old,
:billing
]
)

# Embed One
embeds_one(:period, EpiViewpoint.R4.Period)

# Embed Many
embeds_many(:extension, EpiViewpoint.R4.Extension)
end

def choices(_), do: nil

def version_namespace, do: EpiViewpoint.R4
def version, do: "R4"

def changeset(data \\ %__MODULE__{}, attrs) do
data
|> cast(attrs, @fields)
|> cast_embed(:period)
|> cast_embed(:extension)
|> validate_required(@required_fields)
end
end
Loading

0 comments on commit b28360b

Please sign in to comment.